Skip to content

Reindex from remote #18585

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ public static BackoffPolicy exponentialBackoff(TimeValue initialDelay, int maxNu
return new ExponentialBackoff((int) checkDelay(initialDelay).millis(), maxNumberOfRetries);
}

/**
* Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy.
*/
public static BackoffPolicy wrap(BackoffPolicy delegate, Runnable onBackoff) {
return new WrappedBackoffPolicy(delegate, onBackoff);
}

private static TimeValue checkDelay(TimeValue delay) {
if (delay.millis() > Integer.MAX_VALUE) {
throw new IllegalArgumentException("delay must be <= " + Integer.MAX_VALUE + " ms");
Expand Down Expand Up @@ -200,4 +207,43 @@ public TimeValue next() {
return delay;
}
}

private static final class WrappedBackoffPolicy extends BackoffPolicy {
private final BackoffPolicy delegate;
private final Runnable onBackoff;

public WrappedBackoffPolicy(BackoffPolicy delegate, Runnable onBackoff) {
this.delegate = delegate;
this.onBackoff = onBackoff;
}

@Override
public Iterator<TimeValue> iterator() {
return new WrappedBackoffIterator(delegate.iterator(), onBackoff);
}
}

private static final class WrappedBackoffIterator implements Iterator<TimeValue> {
private final Iterator<TimeValue> delegate;
private final Runnable onBackoff;

public WrappedBackoffIterator(Iterator<TimeValue> delegate, Runnable onBackoff) {
this.delegate = delegate;
this.onBackoff = onBackoff;
}

@Override
public boolean hasNext() {
return delegate.hasNext();
}

@Override
public TimeValue next() {
if (false == delegate.hasNext()) {
throw new NoSuchElementException();
}
onBackoff.run();
return delegate.next();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ public enum ValueType {
STRING(VALUE_STRING),
STRING_OR_NULL(VALUE_STRING, VALUE_NULL),
FLOAT(VALUE_NUMBER, VALUE_STRING),
FLOAT_OR_NULL(VALUE_NUMBER, VALUE_STRING, VALUE_NULL),
DOUBLE(VALUE_NUMBER, VALUE_STRING),
LONG(VALUE_NUMBER, VALUE_STRING),
INT(VALUE_NUMBER, VALUE_STRING),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;

public class BackoffPolicyTests extends ESTestCase {
public void testWrapBackoffPolicy() {
TimeValue timeValue = timeValueMillis(between(0, Integer.MAX_VALUE));
int maxNumberOfRetries = between(1, 1000);
BackoffPolicy policy = BackoffPolicy.constantBackoff(timeValue, maxNumberOfRetries);
AtomicInteger retries = new AtomicInteger();
policy = BackoffPolicy.wrap(policy, retries::getAndIncrement);

int expectedRetries = 0;
{
// Fetching the iterator doesn't call the callback
Iterator<TimeValue> itr = policy.iterator();
assertEquals(expectedRetries, retries.get());

while (itr.hasNext()) {
// hasNext doesn't trigger the callback
assertEquals(expectedRetries, retries.get());
// next does
itr.next();
expectedRetries += 1;
assertEquals(expectedRetries, retries.get());
}
// next doesn't call the callback when there isn't a backoff available
expectThrows(NoSuchElementException.class, () -> itr.next());
assertEquals(expectedRetries, retries.get());
}
{
// The second iterator also calls the callback
Iterator<TimeValue> itr = policy.iterator();
itr.next();
expectedRetries += 1;
assertEquals(expectedRetries, retries.get());
}
}
}
14 changes: 14 additions & 0 deletions docs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ integTest {
configFile 'scripts/my_script.py'
configFile 'userdict_ja.txt'
configFile 'KeywordTokenizer.rbbi'
// Whitelist reindexing from the local node so we can test it.
setting 'reindex.remote.whitelist', 'myself'
}
}

Expand Down Expand Up @@ -81,3 +83,15 @@ Closure setupTwitter = { String name, int count ->
}
setupTwitter('twitter', 5)
setupTwitter('big_twitter', 120)

buildRestTests.setups['host'] = '''
# Fetch the http host. We use the host of the master because we know there will always be a master.
- do:
cluster.state: {}
- set: { master_node: master }
- do:
nodes.info:
metric: [ http ]
- is_true: nodes.$master.http.publish_address
- set: {nodes.$master.http.publish_address: host}
'''
54 changes: 54 additions & 0 deletions docs/reference/docs/reindex.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,60 @@ POST _reindex
// CONSOLE
// TEST[s/^/PUT source\nGET _cluster\/health?wait_for_status=yellow\n/]

[float]
=== Reindex from Remote

Reindex supports reindexing from a remote Elasticsearch cluster:

[source,js]
--------------------------------------------------
POST _reindex
{
"source": {
"remote": {
"host": "http://otherhost:9200",
"username": "user",
"password": "pass"
},
"index": "source",
"query": {
"match": {
"test": "data"
}
}
},
"dest": {
"index": "dest"
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:host]
// TEST[s/^/PUT source\nGET _cluster\/health?wait_for_status=yellow\n/]
// TEST[s/otherhost:9200",/\${host}"/]
// TEST[s/"username": "user",//]
// TEST[s/"password": "pass"//]

The `host` parameter must contain a scheme, host, and port (e.g.
`https://otherhost:9200`). The `username` and `password` parameters are
optional and when they are present reindex will connect to the remote
Elasticsearch node using using basic auth. Be sure to use `https` when using
basic auth or the password will be sent in plain text.

Remote hosts have to be explicitly whitelisted in elasticsearch.yaml using the
`reindex.remote.whitelist` property. It can be set to a comma delimited list
of allowed remote `host` and `port` combinations (e.g.
`otherhost:9200, another:9200`). Scheme is ignored by the whitelist - only host
and port are used.

This feature should work with remote clusters of any version of Elasticsearch
you are likely to find. This should allow you to upgrade from any version of
Elasticsearch to the current version by reindexing from a cluster of the old
version.

To enable queries sent to older versions of Elasticsearch the `query` parameter
is sent directly to the remote host without validation or modification.

[float]
=== URL Parameters

Expand Down
38 changes: 38 additions & 0 deletions modules/reindex/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,41 @@ esplugin {
description 'The Reindex module adds APIs to reindex from one index to another or update documents in place.'
classname 'org.elasticsearch.index.reindex.ReindexPlugin'
}

integTest {
cluster {
// Whitelist reindexing from the local node so we can test it.
setting 'reindex.remote.whitelist', 'myself'
}
}

run {
// Whitelist reindexing from the local node so we can test it.
setting 'reindex.remote.whitelist', 'myself'
}


dependencies {
compile "org.elasticsearch.client:rest:${version}"
// dependencies of the rest client
compile "org.apache.httpcomponents:httpclient:${versions.httpclient}"
compile "org.apache.httpcomponents:httpcore:${versions.httpcore}"
compile "commons-codec:commons-codec:${versions.commonscodec}"
compile "commons-logging:commons-logging:${versions.commonslogging}"
}

dependencyLicenses {
// Don't check the client's license. We know it.
dependencies = project.configurations.runtime.fileCollection {
it.group.startsWith('org.elasticsearch') == false
} - project.configurations.provided
}

thirdPartyAudit.excludes = [
// Commons logging
'javax.servlet.ServletContextEvent',
'javax.servlet.ServletContextListener',
'org.apache.avalon.framework.logger.Logger',
'org.apache.log.Hierarchy',
'org.apache.log.Logger',
]
1 change: 1 addition & 0 deletions modules/reindex/licenses/commons-codec-1.10.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
4b95f4897fa13f2cd904aee711aeafc0c5295cd8
Loading