Skip to content

Commit 694e2a9

Browse files
authored
Add remote cluster client (#29495)
This change adds a client that is connected to a remote cluster. This allows plugins and internal structures to invoke actions on remote clusters just like a if it's a local cluster. The remote cluster must be configured via the cross cluster search infrastructure.
1 parent eab530c commit 694e2a9

File tree

8 files changed

+204
-3
lines changed

8 files changed

+204
-3
lines changed

server/src/main/java/org/elasticsearch/client/Client.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,4 +477,14 @@ public interface Client extends ElasticsearchClient, Releasable {
477477
* issued from it.
478478
*/
479479
Client filterWithHeader(Map<String, String> headers);
480+
481+
/**
482+
* Returns a client to a remote cluster with the given cluster alias.
483+
*
484+
* @throws IllegalArgumentException if the given clusterAlias doesn't exist
485+
* @throws UnsupportedOperationException if this functionality is not available on this client.
486+
*/
487+
default Client getRemoteClusterClient(String clusterAlias) {
488+
throw new UnsupportedOperationException("this client doesn't support remote cluster connections");
489+
}
480490
}

server/src/main/java/org/elasticsearch/client/FilterClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,9 @@ protected <Request extends ActionRequest, Response extends ActionResponse, Reque
7373
protected Client in() {
7474
return in;
7575
}
76+
77+
@Override
78+
public Client getRemoteClusterClient(String clusterAlias) {
79+
return in.getRemoteClusterClient(clusterAlias);
80+
}
7681
}

server/src/main/java/org/elasticsearch/client/node/NodeClient.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.tasks.Task;
3434
import org.elasticsearch.tasks.TaskListener;
3535
import org.elasticsearch.threadpool.ThreadPool;
36+
import org.elasticsearch.transport.RemoteClusterService;
3637

3738
import java.util.Map;
3839
import java.util.function.Supplier;
@@ -48,14 +49,17 @@ public class NodeClient extends AbstractClient {
4849
* {@link #executeLocally(GenericAction, ActionRequest, TaskListener)}.
4950
*/
5051
private Supplier<String> localNodeId;
52+
private RemoteClusterService remoteClusterService;
5153

5254
public NodeClient(Settings settings, ThreadPool threadPool) {
5355
super(settings, threadPool);
5456
}
5557

56-
public void initialize(Map<GenericAction, TransportAction> actions, Supplier<String> localNodeId) {
58+
public void initialize(Map<GenericAction, TransportAction> actions, Supplier<String> localNodeId,
59+
RemoteClusterService remoteClusterService) {
5760
this.actions = actions;
5861
this.localNodeId = localNodeId;
62+
this.remoteClusterService = remoteClusterService;
5963
}
6064

6165
@Override
@@ -117,4 +121,9 @@ > TransportAction<Request, Response> transportAction(GenericAction<Request, Resp
117121
}
118122
return transportAction;
119123
}
124+
125+
@Override
126+
public Client getRemoteClusterClient(String clusterAlias) {
127+
return remoteClusterService.getRemoteClusterClient(threadPool(), clusterAlias);
128+
}
120129
}

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
538538
resourcesToClose.addAll(pluginLifecycleComponents);
539539
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
540540
client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}),
541-
() -> clusterService.localNode().getId());
541+
() -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
542542

543543
if (NetworkModule.HTTP_ENABLED.get(settings)) {
544544
logger.debug("initializing HTTP handlers ...");
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.transport;
20+
21+
import org.elasticsearch.action.Action;
22+
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.ActionListenerResponseHandler;
24+
import org.elasticsearch.action.ActionRequest;
25+
import org.elasticsearch.action.ActionRequestBuilder;
26+
import org.elasticsearch.action.ActionResponse;
27+
import org.elasticsearch.client.Client;
28+
import org.elasticsearch.client.support.AbstractClient;
29+
import org.elasticsearch.common.settings.Settings;
30+
import org.elasticsearch.threadpool.ThreadPool;
31+
32+
final class RemoteClusterAwareClient extends AbstractClient {
33+
34+
private final TransportService service;
35+
private final String clusterAlias;
36+
private final RemoteClusterService remoteClusterService;
37+
38+
RemoteClusterAwareClient(Settings settings, ThreadPool threadPool, TransportService service, String clusterAlias) {
39+
super(settings, threadPool);
40+
this.service = service;
41+
this.clusterAlias = clusterAlias;
42+
this.remoteClusterService = service.getRemoteClusterService();
43+
}
44+
45+
@Override
46+
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
47+
ActionRequestBuilder<Request, Response, RequestBuilder>>
48+
void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
49+
remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> {
50+
Transport.Connection connection = remoteClusterService.getConnection(clusterAlias);
51+
service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY,
52+
new ActionListenerResponseHandler<>(listener, action::newResponse));
53+
},
54+
listener::onFailure));
55+
}
56+
57+
58+
@Override
59+
public void close() {
60+
// do nothing
61+
}
62+
63+
@Override
64+
public Client getRemoteClusterClient(String clusterAlias) {
65+
return remoteClusterService.getRemoteClusterClient(threadPool(), clusterAlias);
66+
}
67+
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.transport;
2020

21+
import org.elasticsearch.client.Client;
2122
import org.elasticsearch.core.internal.io.IOUtils;
2223
import org.elasticsearch.Version;
2324
import org.elasticsearch.action.ActionListener;
@@ -36,6 +37,7 @@
3637
import org.elasticsearch.common.transport.TransportAddress;
3738
import org.elasticsearch.common.unit.TimeValue;
3839
import org.elasticsearch.common.util.concurrent.CountDown;
40+
import org.elasticsearch.threadpool.ThreadPool;
3941

4042
import java.io.Closeable;
4143
import java.io.IOException;
@@ -398,4 +400,18 @@ public void onFailure(Exception e) {
398400
});
399401
}
400402
}
403+
404+
/**
405+
* Returns a client to the remote cluster if the given cluster alias exists.
406+
* @param threadPool the {@link ThreadPool} for the client
407+
* @param clusterAlias the cluster alias the remote cluster is registered under
408+
*
409+
* @throws IllegalArgumentException if the given clusterAlias doesn't exist
410+
*/
411+
public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) {
412+
if (transportService.getRemoteClusterService().getRemoteClusterNames().contains(clusterAlias) == false) {
413+
throw new IllegalArgumentException("unknown cluster alias [" + clusterAlias + "]");
414+
}
415+
return new RemoteClusterAwareClient(settings, threadPool, transportService, clusterAlias);
416+
}
401417
}

server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ protected Client buildClient(Settings headersSettings, GenericAction[] testedAct
4343
Settings settings = HEADER_SETTINGS;
4444
Actions actions = new Actions(settings, threadPool, testedActions);
4545
NodeClient client = new NodeClient(settings, threadPool);
46-
client.initialize(actions, () -> "test");
46+
client.initialize(actions, () -> "test", null);
4747
return client;
4848
}
4949

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.transport;
20+
21+
import org.elasticsearch.Version;
22+
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
23+
import org.elasticsearch.client.Client;
24+
import org.elasticsearch.cluster.ClusterName;
25+
import org.elasticsearch.cluster.node.DiscoveryNode;
26+
import org.elasticsearch.common.settings.Settings;
27+
import org.elasticsearch.test.ESTestCase;
28+
import org.elasticsearch.test.transport.MockTransportService;
29+
import org.elasticsearch.threadpool.TestThreadPool;
30+
import org.elasticsearch.threadpool.ThreadPool;
31+
32+
import java.util.Collections;
33+
import java.util.concurrent.TimeUnit;
34+
35+
import static org.elasticsearch.transport.RemoteClusterConnectionTests.startTransport;
36+
37+
public class RemoteClusterClientTests extends ESTestCase {
38+
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
39+
40+
@Override
41+
public void tearDown() throws Exception {
42+
super.tearDown();
43+
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
44+
}
45+
46+
public void testConnectAndExecuteRequest() throws Exception {
47+
Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build();
48+
try (MockTransportService remoteTransport = startTransport("remote_node", Collections.emptyList(), Version.CURRENT, threadPool,
49+
remoteSettings)) {
50+
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();
51+
52+
Settings localSettings = Settings.builder()
53+
.put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true)
54+
.put("search.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
55+
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
56+
service.start();
57+
service.acceptIncomingRequests();
58+
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
59+
assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode));
60+
Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test");
61+
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get();
62+
assertNotNull(clusterStateResponse);
63+
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
64+
// also test a failure, there is no handler for search registered
65+
ActionNotFoundTransportException ex = expectThrows(ActionNotFoundTransportException.class,
66+
() -> client.prepareSearch().get());
67+
assertEquals("No handler for action [indices:data/read/search]", ex.getMessage());
68+
}
69+
}
70+
}
71+
72+
public void testEnsureWeReconnect() throws Exception {
73+
Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build();
74+
try (MockTransportService remoteTransport = startTransport("remote_node", Collections.emptyList(), Version.CURRENT, threadPool,
75+
remoteSettings)) {
76+
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();
77+
Settings localSettings = Settings.builder()
78+
.put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true)
79+
.put("search.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
80+
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
81+
service.start();
82+
service.acceptIncomingRequests();
83+
service.disconnectFromNode(remoteNode);
84+
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
85+
assertBusy(() -> assertFalse(remoteClusterService.isRemoteNodeConnected("test", remoteNode)));
86+
Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test");
87+
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get();
88+
assertNotNull(clusterStateResponse);
89+
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
90+
}
91+
}
92+
}
93+
94+
}

0 commit comments

Comments
 (0)