Skip to content

Commit ad38b09

Browse files
authored
Introduce ssl settings to reindex from remote (elastic#38292)
Adds reindex.ssl.* settings for reindex from remote. This uses the ssl-config/ internal library to parse and load SSL configuration and files. This is applied when using the low level rest client to connect to a remote ES node Backport of: elastic#37527 Relates: elastic#37287 Resolves: elastic#29755
1 parent 502c3d8 commit ad38b09

25 files changed

+646
-38
lines changed

build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ allprojects {
206206
}
207207

208208
/* Sets up the dependencies that we build as part of this project but
209-
register as thought they were external to resolve internally. We register
209+
register as though they were external to resolve internally. We register
210210
them as external dependencies so the build plugin that we use can be used
211211
to build elasticsearch plugins outside of the elasticsearch source tree. */
212212
ext.projectSubstitutions = [
@@ -217,6 +217,7 @@ allprojects {
217217
"org.elasticsearch:elasticsearch-core:${version}": ':libs:core',
218218
"org.elasticsearch:elasticsearch-x-content:${version}": ':libs:x-content',
219219
"org.elasticsearch:elasticsearch-secure-sm:${version}": ':libs:secure-sm',
220+
"org.elasticsearch:elasticsearch-ssl-config:${version}": ':libs:elasticsearch-ssl-config',
220221
"org.elasticsearch.client:elasticsearch-rest-client:${version}": ':client:rest',
221222
"org.elasticsearch.client:elasticsearch-rest-client-sniffer:${version}": ':client:sniffer',
222223
"org.elasticsearch.client:elasticsearch-rest-high-level-client:${version}": ':client:rest-high-level',

modules/reindex/build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ unitTest {
5656

5757
dependencies {
5858
compile "org.elasticsearch.client:elasticsearch-rest-client:${version}"
59+
compile "org.elasticsearch:elasticsearch-ssl-config:${version}"
5960
// for http - testing reindex from remote
6061
testCompile project(path: ':modules:transport-netty4', configuration: 'runtime')
6162
// for parent/child testing
@@ -71,6 +72,11 @@ thirdPartyAudit.ignoreMissingClasses (
7172
'org.apache.log.Logger',
7273
)
7374

75+
forbiddenPatterns {
76+
// PKCS#12 file are not UTF-8
77+
exclude '**/*.p12'
78+
}
79+
7480
// Support for testing reindex-from-remote against old Elaticsearch versions
7581
configurations {
7682
oldesFixture

modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.action.bulk.Retry;
3535
import org.elasticsearch.action.delete.DeleteRequest;
3636
import org.elasticsearch.action.index.IndexRequest;
37+
import org.elasticsearch.action.support.TransportAction;
3738
import org.elasticsearch.client.ParentTaskAssigningClient;
3839
import org.elasticsearch.common.unit.ByteSizeValue;
3940
import org.elasticsearch.common.unit.TimeValue;
@@ -83,13 +84,15 @@
8384
* Abstract base for scrolling across a search and executing bulk actions on all results. All package private methods are package private so
8485
* their tests can use them. Most methods run in the listener thread pool because the are meant to be fast and don't expect to block.
8586
*/
86-
public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBulkByScrollRequest<Request>> {
87+
public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBulkByScrollRequest<Request>,
88+
Action extends TransportAction<Request, ?>> {
89+
8790
protected final Logger logger;
8891
protected final BulkByScrollTask task;
8992
protected final WorkerBulkByScrollTaskState worker;
9093
protected final ThreadPool threadPool;
91-
protected final ScriptService scriptService;
9294

95+
protected final Action mainAction;
9396
/**
9497
* The request for this action. Named mainRequest because we create lots of <code>request</code> variables all representing child
9598
* requests of this mainRequest.
@@ -113,7 +116,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
113116

114117
public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
115118
boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
116-
ThreadPool threadPool, Request mainRequest, ScriptService scriptService,
119+
ThreadPool threadPool, Action mainAction, Request mainRequest,
117120
ActionListener<BulkByScrollResponse> listener) {
118121

119122
this.task = task;
@@ -125,7 +128,7 @@ public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourc
125128
this.logger = logger;
126129
this.client = client;
127130
this.threadPool = threadPool;
128-
this.scriptService = scriptService;
131+
this.mainAction = mainAction;
129132
this.mainRequest = mainRequest;
130133
this.listener = listener;
131134
BackoffPolicy backoffPolicy = buildBackoffPolicy();

modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,22 @@
3131
/**
3232
* Implementation of delete-by-query using scrolling and bulk.
3333
*/
34-
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest> {
34+
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest, TransportDeleteByQueryAction> {
35+
3536
private final boolean useSeqNoForCAS;
3637

3738
public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
38-
ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService,
39-
ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
39+
ThreadPool threadPool, TransportDeleteByQueryAction action, DeleteByQueryRequest request,
40+
ScriptService scriptService, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
4041
super(task,
4142
// not all nodes support sequence number powered optimistic concurrency control, we fall back to version
4243
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false,
4344
// all nodes support sequence number powered optimistic concurrency control and we can use it
4445
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0),
45-
logger, client, threadPool, request, scriptService, listener);
46+
logger, client, threadPool, action, request, listener);
4647
useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
4748
}
4849

49-
5050
@Override
5151
protected boolean accept(ScrollableHitSource.Hit doc) {
5252
// Delete-by-query does not require the source to delete a document

modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexPlugin.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,32 @@
2121

2222
import org.elasticsearch.action.ActionRequest;
2323
import org.elasticsearch.action.ActionResponse;
24+
import org.elasticsearch.client.Client;
2425
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2526
import org.elasticsearch.cluster.node.DiscoveryNodes;
27+
import org.elasticsearch.cluster.service.ClusterService;
2628
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2729
import org.elasticsearch.common.settings.ClusterSettings;
2830
import org.elasticsearch.common.settings.IndexScopedSettings;
2931
import org.elasticsearch.common.settings.Setting;
3032
import org.elasticsearch.common.settings.Settings;
3133
import org.elasticsearch.common.settings.SettingsFilter;
34+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
35+
import org.elasticsearch.env.Environment;
36+
import org.elasticsearch.env.NodeEnvironment;
3237
import org.elasticsearch.plugins.ActionPlugin;
3338
import org.elasticsearch.plugins.Plugin;
3439
import org.elasticsearch.rest.RestController;
3540
import org.elasticsearch.rest.RestHandler;
41+
import org.elasticsearch.script.ScriptService;
3642
import org.elasticsearch.tasks.Task;
43+
import org.elasticsearch.threadpool.ThreadPool;
44+
import org.elasticsearch.watcher.ResourceWatcherService;
3745

46+
import java.util.ArrayList;
3847
import java.util.Arrays;
48+
import java.util.Collection;
49+
import java.util.Collections;
3950
import java.util.List;
4051
import java.util.function.Supplier;
4152

@@ -69,8 +80,19 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
6980
new RestRethrottleAction(settings, restController, nodesInCluster));
7081
}
7182

83+
@Override
84+
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
85+
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
86+
NamedXContentRegistry xContentRegistry, Environment environment,
87+
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
88+
return Collections.singletonList(new ReindexSslConfig(environment.settings(), environment, resourceWatcherService));
89+
}
90+
7291
@Override
7392
public List<Setting<?>> getSettings() {
74-
return singletonList(TransportReindexAction.REMOTE_CLUSTER_WHITELIST);
93+
final List<Setting<?>> settings = new ArrayList<>();
94+
settings.add(TransportReindexAction.REMOTE_CLUSTER_WHITELIST);
95+
settings.addAll(ReindexSslConfig.getSettings());
96+
return settings;
7597
}
7698
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.reindex;
21+
22+
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
23+
import org.apache.http.conn.ssl.NoopHostnameVerifier;
24+
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
25+
import org.elasticsearch.common.Strings;
26+
import org.elasticsearch.common.settings.SecureSetting;
27+
import org.elasticsearch.common.settings.SecureString;
28+
import org.elasticsearch.common.settings.Setting;
29+
import org.elasticsearch.common.settings.Settings;
30+
import org.elasticsearch.common.ssl.SslConfiguration;
31+
import org.elasticsearch.common.ssl.SslConfigurationKeys;
32+
import org.elasticsearch.common.ssl.SslConfigurationLoader;
33+
import org.elasticsearch.env.Environment;
34+
import org.elasticsearch.watcher.FileChangesListener;
35+
import org.elasticsearch.watcher.FileWatcher;
36+
import org.elasticsearch.watcher.ResourceWatcherService;
37+
38+
import javax.net.ssl.HostnameVerifier;
39+
import javax.net.ssl.SSLContext;
40+
import java.io.IOException;
41+
import java.io.UncheckedIOException;
42+
import java.nio.file.Path;
43+
import java.util.ArrayList;
44+
import java.util.Collections;
45+
import java.util.HashMap;
46+
import java.util.List;
47+
import java.util.Map;
48+
import java.util.function.Function;
49+
50+
import static org.elasticsearch.common.settings.Setting.listSetting;
51+
import static org.elasticsearch.common.settings.Setting.simpleString;
52+
53+
/**
54+
* Loads "reindex.ssl.*" configuration from Settings, and makes the applicable configuration (trust manager / key manager / hostname
55+
* verification / cipher-suites) available for reindex-from-remote.
56+
*/
57+
class ReindexSslConfig {
58+
59+
private static final Map<String, Setting<?>> SETTINGS = new HashMap<>();
60+
private static final Map<String, Setting<SecureString>> SECURE_SETTINGS = new HashMap<>();
61+
62+
static {
63+
Setting.Property[] defaultProperties = new Setting.Property[] { Setting.Property.NodeScope, Setting.Property.Filtered };
64+
Setting.Property[] deprecatedProperties = new Setting.Property[] { Setting.Property.Deprecated, Setting.Property.NodeScope,
65+
Setting.Property.Filtered };
66+
for (String key : SslConfigurationKeys.getStringKeys()) {
67+
String settingName = "reindex.ssl." + key;
68+
final Setting.Property[] properties = SslConfigurationKeys.isDeprecated(key) ? deprecatedProperties : defaultProperties;
69+
SETTINGS.put(settingName, simpleString(settingName, properties));
70+
}
71+
for (String key : SslConfigurationKeys.getListKeys()) {
72+
String settingName = "reindex.ssl." + key;
73+
final Setting.Property[] properties = SslConfigurationKeys.isDeprecated(key) ? deprecatedProperties : defaultProperties;
74+
SETTINGS.put(settingName, listSetting(settingName, Collections.emptyList(), Function.identity(), properties));
75+
}
76+
for (String key : SslConfigurationKeys.getSecureStringKeys()) {
77+
String settingName = "reindex.ssl." + key;
78+
SECURE_SETTINGS.put(settingName, SecureSetting.secureString(settingName, null));
79+
}
80+
}
81+
82+
private final SslConfiguration configuration;
83+
private volatile SSLContext context;
84+
85+
public static List<Setting<?>> getSettings() {
86+
List<Setting<?>> settings = new ArrayList<>();
87+
settings.addAll(SETTINGS.values());
88+
settings.addAll(SECURE_SETTINGS.values());
89+
return settings;
90+
}
91+
92+
ReindexSslConfig(Settings settings, Environment environment, ResourceWatcherService resourceWatcher) {
93+
final SslConfigurationLoader loader = new SslConfigurationLoader("reindex.ssl.") {
94+
95+
@Override
96+
protected String getSettingAsString(String key) {
97+
return settings.get(key);
98+
}
99+
100+
@Override
101+
protected char[] getSecureSetting(String key) {
102+
final Setting<SecureString> setting = SECURE_SETTINGS.get(key);
103+
if (setting == null) {
104+
throw new IllegalArgumentException("The secure setting [" + key + "] is not registered");
105+
}
106+
return setting.get(settings).getChars();
107+
}
108+
109+
@Override
110+
protected List<String> getSettingAsList(String key) throws Exception {
111+
return settings.getAsList(key);
112+
}
113+
};
114+
configuration = loader.load(environment.configFile());
115+
reload();
116+
117+
final FileChangesListener listener = new FileChangesListener() {
118+
@Override
119+
public void onFileCreated(Path file) {
120+
onFileChanged(file);
121+
}
122+
123+
@Override
124+
public void onFileDeleted(Path file) {
125+
onFileChanged(file);
126+
}
127+
128+
@Override
129+
public void onFileChanged(Path file) {
130+
ReindexSslConfig.this.reload();
131+
}
132+
};
133+
for (Path file : configuration.getDependentFiles()) {
134+
try {
135+
final FileWatcher watcher = new FileWatcher(file);
136+
watcher.addListener(listener);
137+
resourceWatcher.add(watcher, ResourceWatcherService.Frequency.HIGH);
138+
} catch (IOException e) {
139+
throw new UncheckedIOException("cannot watch file [" + file + "]", e);
140+
}
141+
}
142+
}
143+
144+
private void reload() {
145+
this.context = configuration.createSslContext();
146+
}
147+
148+
/**
149+
* Encapsulate the loaded SSL configuration as a HTTP-client {@link SSLIOSessionStrategy}.
150+
* The returned strategy is immutable, but successive calls will return different objects that may have different
151+
* configurations if the underlying key/certificate files are modified.
152+
*/
153+
SSLIOSessionStrategy getStrategy() {
154+
final HostnameVerifier hostnameVerifier = configuration.getVerificationMode().isHostnameVerificationEnabled()
155+
? new DefaultHostnameVerifier()
156+
: new NoopHostnameVerifier();
157+
final String[] protocols = configuration.getSupportedProtocols().toArray(Strings.EMPTY_ARRAY);
158+
final String[] cipherSuites = configuration.getCipherSuites().toArray(Strings.EMPTY_ARRAY);
159+
return new SSLIOSessionStrategy(context, protocols, cipherSuites, hostnameVerifier);
160+
}
161+
}

modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void doExecute(Task task, DeleteByQueryRequest request, ActionListener<Bu
5858
ClusterState state = clusterService.state();
5959
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
6060
bulkByScrollTask);
61-
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, request, scriptService, state,
61+
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, scriptService, state,
6262
listener).start();
6363
}
6464
);

0 commit comments

Comments
 (0)