Skip to content

Commit b3c015e

Browse files
committed
Reindex from remote
This adds a remote option to reindex that looks like ``` curl -POST 'localhost:9200/_reindex?pretty' -d'{ "source": { "remote": { "host": "http://otherhost:9200" }, "index": "target", "query": { "match": { "foo": "bar" } } }, "dest": { "index": "target" } }' ``` This reindex has all of the features of local reindex: * Using queries to filter what is copied * Retry on rejection * Throttle/rethottle The big advantage of this version is that it goes over the HTTP API which can be made backwards compatible. Some things are different: The query field is sent directly to the other node rather than parsed on the coordinating node. This should allow it to support constructs that are invalid on the coordinating node but are valid on the target node. Mostly, that means old syntax.
1 parent 96f283c commit b3c015e

File tree

71 files changed

+4792
-470
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+4792
-470
lines changed

core/src/main/java/org/elasticsearch/action/bulk/BackoffPolicy.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,13 @@ public static BackoffPolicy exponentialBackoff(TimeValue initialDelay, int maxNu
8989
return new ExponentialBackoff((int) checkDelay(initialDelay).millis(), maxNumberOfRetries);
9090
}
9191

92+
/**
93+
* Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy.
94+
*/
95+
public static BackoffPolicy wrap(BackoffPolicy delegate, Runnable onBackoff) {
96+
return new WrappedBackoffPolicy(delegate, onBackoff);
97+
}
98+
9299
private static TimeValue checkDelay(TimeValue delay) {
93100
if (delay.millis() > Integer.MAX_VALUE) {
94101
throw new IllegalArgumentException("delay must be <= " + Integer.MAX_VALUE + " ms");
@@ -200,4 +207,43 @@ public TimeValue next() {
200207
return delay;
201208
}
202209
}
210+
211+
private static final class WrappedBackoffPolicy extends BackoffPolicy {
212+
private final BackoffPolicy delegate;
213+
private final Runnable onBackoff;
214+
215+
public WrappedBackoffPolicy(BackoffPolicy delegate, Runnable onBackoff) {
216+
this.delegate = delegate;
217+
this.onBackoff = onBackoff;
218+
}
219+
220+
@Override
221+
public Iterator<TimeValue> iterator() {
222+
return new WrappedBackoffIterator(delegate.iterator(), onBackoff);
223+
}
224+
}
225+
226+
private static final class WrappedBackoffIterator implements Iterator<TimeValue> {
227+
private final Iterator<TimeValue> delegate;
228+
private final Runnable onBackoff;
229+
230+
public WrappedBackoffIterator(Iterator<TimeValue> delegate, Runnable onBackoff) {
231+
this.delegate = delegate;
232+
this.onBackoff = onBackoff;
233+
}
234+
235+
@Override
236+
public boolean hasNext() {
237+
return delegate.hasNext();
238+
}
239+
240+
@Override
241+
public TimeValue next() {
242+
if (false == delegate.hasNext()) {
243+
throw new NoSuchElementException();
244+
}
245+
onBackoff.run();
246+
return delegate.next();
247+
}
248+
}
203249
}

core/src/main/java/org/elasticsearch/common/xcontent/ObjectParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,7 @@ public enum ValueType {
413413
STRING(VALUE_STRING),
414414
STRING_OR_NULL(VALUE_STRING, VALUE_NULL),
415415
FLOAT(VALUE_NUMBER, VALUE_STRING),
416+
FLOAT_OR_NULL(VALUE_NUMBER, VALUE_STRING, VALUE_NULL),
416417
DOUBLE(VALUE_NUMBER, VALUE_STRING),
417418
LONG(VALUE_NUMBER, VALUE_STRING),
418419
INT(VALUE_NUMBER, VALUE_STRING),
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.bulk;
21+
22+
import org.elasticsearch.common.unit.TimeValue;
23+
import org.elasticsearch.test.ESTestCase;
24+
25+
import java.util.Iterator;
26+
import java.util.NoSuchElementException;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
29+
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
30+
31+
public class BackoffPolicyTests extends ESTestCase {
32+
public void testWrapBackoffPolicy() {
33+
TimeValue timeValue = timeValueMillis(between(0, Integer.MAX_VALUE));
34+
int maxNumberOfRetries = between(1, 1000);
35+
BackoffPolicy policy = BackoffPolicy.constantBackoff(timeValue, maxNumberOfRetries);
36+
AtomicInteger retries = new AtomicInteger();
37+
policy = BackoffPolicy.wrap(policy, retries::getAndIncrement);
38+
39+
int expectedRetries = 0;
40+
{
41+
// Fetching the iterator doesn't call the callback
42+
Iterator<TimeValue> itr = policy.iterator();
43+
assertEquals(expectedRetries, retries.get());
44+
45+
while (itr.hasNext()) {
46+
// hasNext doesn't trigger the callback
47+
assertEquals(expectedRetries, retries.get());
48+
// next does
49+
itr.next();
50+
expectedRetries += 1;
51+
assertEquals(expectedRetries, retries.get());
52+
}
53+
// next doesn't call the callback when there isn't a backoff available
54+
expectThrows(NoSuchElementException.class, () -> itr.next());
55+
assertEquals(expectedRetries, retries.get());
56+
}
57+
{
58+
// The second iterator also calls the callback
59+
Iterator<TimeValue> itr = policy.iterator();
60+
itr.next();
61+
expectedRetries += 1;
62+
assertEquals(expectedRetries, retries.get());
63+
}
64+
}
65+
}

docs/build.gradle

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ integTest {
3030
configFile 'scripts/my_script.py'
3131
configFile 'userdict_ja.txt'
3232
configFile 'KeywordTokenizer.rbbi'
33+
// Whitelist reindexing from the local node so we can test it.
34+
setting 'reindex.remote.whitelist', 'myself'
3335
}
3436
}
3537

@@ -81,3 +83,15 @@ Closure setupTwitter = { String name, int count ->
8183
}
8284
setupTwitter('twitter', 5)
8385
setupTwitter('big_twitter', 120)
86+
87+
buildRestTests.setups['host'] = '''
88+
# Fetch the http host. We use the host of the master because we know there will always be a master.
89+
- do:
90+
cluster.state: {}
91+
- set: { master_node: master }
92+
- do:
93+
nodes.info:
94+
metric: [ http ]
95+
- is_true: nodes.$master.http.publish_address
96+
- set: {nodes.$master.http.publish_address: host}
97+
'''

docs/reference/docs/reindex.asciidoc

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,60 @@ POST _reindex
361361
// CONSOLE
362362
// TEST[s/^/PUT source\nGET _cluster\/health?wait_for_status=yellow\n/]
363363

364+
[float]
365+
=== Reindex from Remote
366+
367+
Reindex supports reindexing from a remote Elasticsearch cluster:
368+
369+
[source,js]
370+
--------------------------------------------------
371+
POST _reindex
372+
{
373+
"source": {
374+
"remote": {
375+
"host": "http://otherhost:9200",
376+
"username": "user",
377+
"password": "pass"
378+
},
379+
"index": "source",
380+
"query": {
381+
"match": {
382+
"test": "data"
383+
}
384+
}
385+
},
386+
"dest": {
387+
"index": "dest"
388+
}
389+
}
390+
--------------------------------------------------
391+
// CONSOLE
392+
// TEST[setup:host]
393+
// TEST[s/^/PUT source\nGET _cluster\/health?wait_for_status=yellow\n/]
394+
// TEST[s/otherhost:9200",/\${host}"/]
395+
// TEST[s/"username": "user",//]
396+
// TEST[s/"password": "pass"//]
397+
398+
The `host` parameter must contain a scheme, host, and port (e.g.
399+
`https://otherhost:9200`). The `username` and `password` parameters are
400+
optional and when they are present reindex will connect to the remote
401+
Elasticsearch node using using basic auth. Be sure to use `https` when using
402+
basic auth or the password will be sent in plain text.
403+
404+
Remote hosts have to be explicitly whitelisted in elasticsearch.yaml using the
405+
`reindex.remote.whitelist` property. It can be set to a comma delimited list
406+
of allowed remote `host` and `port` combinations (e.g.
407+
`otherhost:9200, another:9200`). Scheme is ignored by the whitelist - only host
408+
and port are used.
409+
410+
This feature should work with remote clusters of any version of Elasticsearch
411+
you are likely to find. This should allow you to upgrade from any version of
412+
Elasticsearch to the current version by reindexing from a cluster of the old
413+
version.
414+
415+
To enable queries sent to older versions of Elasticsearch the `query` parameter
416+
is sent directly to the remote host without validation or modification.
417+
364418
[float]
365419
=== URL Parameters
366420

modules/reindex/build.gradle

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,41 @@ esplugin {
2121
description 'The Reindex module adds APIs to reindex from one index to another or update documents in place.'
2222
classname 'org.elasticsearch.index.reindex.ReindexPlugin'
2323
}
24+
25+
integTest {
26+
cluster {
27+
// Whitelist reindexing from the local node so we can test it.
28+
setting 'reindex.remote.whitelist', 'myself'
29+
}
30+
}
31+
32+
run {
33+
// Whitelist reindexing from the local node so we can test it.
34+
setting 'reindex.remote.whitelist', 'myself'
35+
}
36+
37+
38+
dependencies {
39+
compile "org.elasticsearch.client:rest:${version}"
40+
// dependencies of the rest client
41+
compile "org.apache.httpcomponents:httpclient:${versions.httpclient}"
42+
compile "org.apache.httpcomponents:httpcore:${versions.httpcore}"
43+
compile "commons-codec:commons-codec:${versions.commonscodec}"
44+
compile "commons-logging:commons-logging:${versions.commonslogging}"
45+
}
46+
47+
dependencyLicenses {
48+
// Don't check the client's license. We know it.
49+
dependencies = project.configurations.runtime.fileCollection {
50+
it.group.startsWith('org.elasticsearch') == false
51+
} - project.configurations.provided
52+
}
53+
54+
thirdPartyAudit.excludes = [
55+
// Commons logging
56+
'javax.servlet.ServletContextEvent',
57+
'javax.servlet.ServletContextListener',
58+
'org.apache.avalon.framework.logger.Logger',
59+
'org.apache.log.Hierarchy',
60+
'org.apache.log.Logger',
61+
]
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
4b95f4897fa13f2cd904aee711aeafc0c5295cd8

0 commit comments

Comments
 (0)