Skip to content

Commit 3f21f93

Browse files
committed
Cross Cluster Search: do not use dedicated masters as gateways (#30926)
When we are connecting to a remote cluster we should never select dedicated master nodes as gateway nodes, or we will end up loading them with requests that should rather go to other type of nodes e.g. data nodes or coord_only nodes. This commit adds the selection based on the node role, to the existing selection based on version and potential node attributes. Closes #30687
1 parent 7706aae commit 3f21f93

File tree

2 files changed

+199
-11
lines changed

2 files changed

+199
-11
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package org.elasticsearch.transport;
2020

21-
import org.elasticsearch.client.Client;
22-
import org.elasticsearch.core.internal.io.IOUtils;
2321
import org.elasticsearch.Version;
2422
import org.elasticsearch.action.ActionListener;
2523
import org.elasticsearch.action.OriginalIndices;
@@ -28,6 +26,7 @@
2826
import org.elasticsearch.action.support.GroupedActionListener;
2927
import org.elasticsearch.action.support.IndicesOptions;
3028
import org.elasticsearch.action.support.PlainActionFuture;
29+
import org.elasticsearch.client.Client;
3130
import org.elasticsearch.cluster.node.DiscoveryNode;
3231
import org.elasticsearch.common.Booleans;
3332
import org.elasticsearch.common.Strings;
@@ -37,6 +36,7 @@
3736
import org.elasticsearch.common.transport.TransportAddress;
3837
import org.elasticsearch.common.unit.TimeValue;
3938
import org.elasticsearch.common.util.concurrent.CountDown;
39+
import org.elasticsearch.core.internal.io.IOUtils;
4040
import org.elasticsearch.threadpool.ThreadPool;
4141

4242
import java.io.Closeable;
@@ -98,6 +98,9 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
9898
Setting.affixKeySetting("search.remote.", "skip_unavailable",
9999
key -> boolSetting(key, false, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS);
100100

101+
private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
102+
&& (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode());
103+
101104
private final TransportService transportService;
102105
private final int numRemoteConnections;
103106
private volatile Map<String, RemoteClusterConnection> remoteClusters = Collections.emptyMap();
@@ -122,13 +125,6 @@ private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>>
122125
connectionListener.onResponse(null);
123126
} else {
124127
CountDown countDown = new CountDown(seeds.size());
125-
Predicate<DiscoveryNode> nodePredicate = (node) -> Version.CURRENT.isCompatible(node.getVersion());
126-
if (REMOTE_NODE_ATTRIBUTE.exists(settings)) {
127-
// nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for
128-
// cross cluster search
129-
String attribute = REMOTE_NODE_ATTRIBUTE.get(settings);
130-
nodePredicate = nodePredicate.and((node) -> Booleans.parseBoolean(node.getAttributes().getOrDefault(attribute, "false")));
131-
}
132128
remoteClusters.putAll(this.remoteClusters);
133129
for (Map.Entry<String, List<DiscoveryNode>> entry : seeds.entrySet()) {
134130
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
@@ -144,7 +140,7 @@ private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>>
144140

145141
if (remote == null) { // this is a new cluster we have to add a new representation
146142
remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, numRemoteConnections,
147-
nodePredicate);
143+
getNodePredicate(settings));
148144
remoteClusters.put(entry.getKey(), remote);
149145
}
150146

@@ -169,6 +165,15 @@ private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>>
169165
this.remoteClusters = Collections.unmodifiableMap(remoteClusters);
170166
}
171167

168+
static Predicate<DiscoveryNode> getNodePredicate(Settings settings) {
169+
if (REMOTE_NODE_ATTRIBUTE.exists(settings)) {
170+
// nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for cross cluster search
171+
String attribute = REMOTE_NODE_ATTRIBUTE.get(settings);
172+
return DEFAULT_NODE_PREDICATE.and((node) -> Booleans.parseBoolean(node.getAttributes().getOrDefault(attribute, "false")));
173+
}
174+
return DEFAULT_NODE_PREDICATE;
175+
}
176+
172177
/**
173178
* Returns <code>true</code> if at least one remote cluster is configured
174179
*/

server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

Lines changed: 184 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.elasticsearch.transport;
2020

21-
import org.elasticsearch.core.internal.io.IOUtils;
2221
import org.elasticsearch.Version;
2322
import org.elasticsearch.action.ActionListener;
2423
import org.elasticsearch.action.LatchedActionListener;
@@ -30,7 +29,9 @@
3029
import org.elasticsearch.common.settings.ClusterSettings;
3130
import org.elasticsearch.common.settings.Settings;
3231
import org.elasticsearch.common.transport.TransportAddress;
32+
import org.elasticsearch.core.internal.io.IOUtils;
3333
import org.elasticsearch.test.ESTestCase;
34+
import org.elasticsearch.test.VersionUtils;
3435
import org.elasticsearch.test.transport.MockTransportService;
3536
import org.elasticsearch.threadpool.TestThreadPool;
3637
import org.elasticsearch.threadpool.ThreadPool;
@@ -40,6 +41,7 @@
4041
import java.net.InetSocketAddress;
4142
import java.util.Arrays;
4243
import java.util.Collections;
44+
import java.util.EnumSet;
4345
import java.util.HashMap;
4446
import java.util.HashSet;
4547
import java.util.List;
@@ -50,6 +52,7 @@
5052
import java.util.concurrent.TimeUnit;
5153
import java.util.concurrent.atomic.AtomicReference;
5254
import java.util.function.BiFunction;
55+
import java.util.function.Predicate;
5356

5457
import static org.hamcrest.CoreMatchers.containsString;
5558
import static org.hamcrest.CoreMatchers.instanceOf;
@@ -279,6 +282,75 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException {
279282
}
280283
}
281284

285+
public void testRemoteNodeRoles() throws IOException, InterruptedException {
286+
final Settings settings = Settings.EMPTY;
287+
final List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
288+
final Settings data = Settings.builder().put("node.master", false).build();
289+
final Settings dedicatedMaster = Settings.builder().put("node.data", false).put("node.ingest", "false").build();
290+
try (MockTransportService c1N1 =
291+
startTransport("cluster_1_node_1", knownNodes, Version.CURRENT, dedicatedMaster);
292+
MockTransportService c1N2 =
293+
startTransport("cluster_1_node_2", knownNodes, Version.CURRENT, data);
294+
MockTransportService c2N1 =
295+
startTransport("cluster_2_node_1", knownNodes, Version.CURRENT, dedicatedMaster);
296+
MockTransportService c2N2 =
297+
startTransport("cluster_2_node_2", knownNodes, Version.CURRENT, data)) {
298+
final DiscoveryNode c1N1Node = c1N1.getLocalDiscoNode();
299+
final DiscoveryNode c1N2Node = c1N2.getLocalDiscoNode();
300+
final DiscoveryNode c2N1Node = c2N1.getLocalDiscoNode();
301+
final DiscoveryNode c2N2Node = c2N2.getLocalDiscoNode();
302+
knownNodes.add(c1N1Node);
303+
knownNodes.add(c1N2Node);
304+
knownNodes.add(c2N1Node);
305+
knownNodes.add(c2N2Node);
306+
Collections.shuffle(knownNodes, random());
307+
308+
try (MockTransportService transportService = MockTransportService.createNewService(
309+
settings,
310+
Version.CURRENT,
311+
threadPool,
312+
null)) {
313+
transportService.start();
314+
transportService.acceptIncomingRequests();
315+
final Settings.Builder builder = Settings.builder();
316+
builder.putList("search.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
317+
builder.putList("search.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
318+
try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
319+
assertFalse(service.isCrossClusterSearchEnabled());
320+
service.initializeRemoteClusters();
321+
assertFalse(service.isCrossClusterSearchEnabled());
322+
323+
final InetSocketAddress c1N1Address = c1N1Node.getAddress().address();
324+
final InetSocketAddress c1N2Address = c1N2Node.getAddress().address();
325+
final InetSocketAddress c2N1Address = c2N1Node.getAddress().address();
326+
final InetSocketAddress c2N2Address = c2N2Node.getAddress().address();
327+
328+
final CountDownLatch firstLatch = new CountDownLatch(1);
329+
service.updateRemoteCluster(
330+
"cluster_1",
331+
Arrays.asList(c1N1Address, c1N2Address),
332+
connectionListener(firstLatch));
333+
firstLatch.await();
334+
335+
final CountDownLatch secondLatch = new CountDownLatch(1);
336+
service.updateRemoteCluster(
337+
"cluster_2",
338+
Arrays.asList(c2N1Address, c2N2Address),
339+
connectionListener(secondLatch));
340+
secondLatch.await();
341+
342+
assertTrue(service.isCrossClusterSearchEnabled());
343+
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
344+
assertFalse(service.isRemoteNodeConnected("cluster_1", c1N1Node));
345+
assertTrue(service.isRemoteNodeConnected("cluster_1", c1N2Node));
346+
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
347+
assertFalse(service.isRemoteNodeConnected("cluster_2", c2N1Node));
348+
assertTrue(service.isRemoteNodeConnected("cluster_2", c2N2Node));
349+
}
350+
}
351+
}
352+
}
353+
282354
private ActionListener<Void> connectionListener(final CountDownLatch latch) {
283355
return ActionListener.wrap(x -> latch.countDown(), x -> fail());
284356
}
@@ -630,4 +702,115 @@ public void testRemoteClusterSkipIfDisconnectedSetting() {
630702
}
631703
}
632704
}
705+
706+
public void testGetNodePredicateNodeRoles() {
707+
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
708+
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(Settings.EMPTY);
709+
{
710+
DiscoveryNode all = new DiscoveryNode("id", address, Collections.emptyMap(),
711+
new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class)), Version.CURRENT);
712+
assertTrue(nodePredicate.test(all));
713+
}
714+
{
715+
DiscoveryNode dataMaster = new DiscoveryNode("id", address, Collections.emptyMap(),
716+
new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA, DiscoveryNode.Role.MASTER)), Version.CURRENT);
717+
assertTrue(nodePredicate.test(dataMaster));
718+
}
719+
{
720+
DiscoveryNode dedicatedMaster = new DiscoveryNode("id", address, Collections.emptyMap(),
721+
new HashSet<>(EnumSet.of(DiscoveryNode.Role.MASTER)), Version.CURRENT);
722+
assertFalse(nodePredicate.test(dedicatedMaster));
723+
}
724+
{
725+
DiscoveryNode dedicatedIngest = new DiscoveryNode("id", address, Collections.emptyMap(),
726+
new HashSet<>(EnumSet.of(DiscoveryNode.Role.INGEST)), Version.CURRENT);
727+
assertTrue(nodePredicate.test(dedicatedIngest));
728+
}
729+
{
730+
DiscoveryNode masterIngest = new DiscoveryNode("id", address, Collections.emptyMap(),
731+
new HashSet<>(EnumSet.of(DiscoveryNode.Role.INGEST, DiscoveryNode.Role.MASTER)), Version.CURRENT);
732+
assertTrue(nodePredicate.test(masterIngest));
733+
}
734+
{
735+
DiscoveryNode dedicatedData = new DiscoveryNode("id", address, Collections.emptyMap(),
736+
new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA)), Version.CURRENT);
737+
assertTrue(nodePredicate.test(dedicatedData));
738+
}
739+
{
740+
DiscoveryNode ingestData = new DiscoveryNode("id", address, Collections.emptyMap(),
741+
new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA, DiscoveryNode.Role.INGEST)), Version.CURRENT);
742+
assertTrue(nodePredicate.test(ingestData));
743+
}
744+
{
745+
DiscoveryNode coordOnly = new DiscoveryNode("id", address, Collections.emptyMap(),
746+
new HashSet<>(EnumSet.noneOf(DiscoveryNode.Role.class)), Version.CURRENT);
747+
assertTrue(nodePredicate.test(coordOnly));
748+
}
749+
}
750+
751+
public void testGetNodePredicateNodeVersion() {
752+
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
753+
Set<DiscoveryNode.Role> roles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class));
754+
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(Settings.EMPTY);
755+
Version version = VersionUtils.randomVersion(random());
756+
DiscoveryNode node = new DiscoveryNode("id", address, Collections.emptyMap(), roles, version);
757+
assertThat(nodePredicate.test(node), equalTo(Version.CURRENT.isCompatible(version)));
758+
}
759+
760+
public void testGetNodePredicateNodeAttrs() {
761+
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
762+
Set<DiscoveryNode.Role> roles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class));
763+
Settings settings = Settings.builder().put("search.remote.node.attr", "gateway").build();
764+
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(settings);
765+
{
766+
DiscoveryNode nonGatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"),
767+
roles, Version.CURRENT);
768+
assertFalse(nodePredicate.test(nonGatewayNode));
769+
assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(nonGatewayNode));
770+
}
771+
{
772+
DiscoveryNode gatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"),
773+
roles, Version.CURRENT);
774+
assertTrue(nodePredicate.test(gatewayNode));
775+
assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(gatewayNode));
776+
}
777+
{
778+
DiscoveryNode noAttrNode = new DiscoveryNode("id", address, Collections.emptyMap(), roles, Version.CURRENT);
779+
assertFalse(nodePredicate.test(noAttrNode));
780+
assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(noAttrNode));
781+
}
782+
}
783+
784+
public void testGetNodePredicatesCombination() {
785+
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
786+
Settings settings = Settings.builder().put("search.remote.node.attr", "gateway").build();
787+
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(settings);
788+
Set<DiscoveryNode.Role> allRoles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class));
789+
Set<DiscoveryNode.Role> dedicatedMasterRoles = new HashSet<>(EnumSet.of(DiscoveryNode.Role.MASTER));
790+
{
791+
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"),
792+
dedicatedMasterRoles, Version.CURRENT);
793+
assertFalse(nodePredicate.test(node));
794+
}
795+
{
796+
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"),
797+
dedicatedMasterRoles, Version.CURRENT);
798+
assertFalse(nodePredicate.test(node));
799+
}
800+
{
801+
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"),
802+
dedicatedMasterRoles, Version.CURRENT);
803+
assertFalse(nodePredicate.test(node));
804+
}
805+
{
806+
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"),
807+
allRoles, Version.CURRENT);
808+
assertTrue(nodePredicate.test(node));
809+
}
810+
{
811+
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"),
812+
allRoles, Version.V_5_3_0);
813+
assertFalse(nodePredicate.test(node));
814+
}
815+
}
633816
}

0 commit comments

Comments
 (0)