From c47686ba63ebfcb64ed0e15362b93fc7cb4d6461 Mon Sep 17 00:00:00 2001 From: javanna Date: Tue, 29 May 2018 14:28:10 +0200 Subject: [PATCH] Cross Cluster Search: do not use dedicated masters as gateways When we are connecting to a remote cluster we should never select dedicated master nodes as gateway nodes, or we will end up loading them with requests that should rather go to other type of nodes e.g. data nodes or coord_only nodes. This commit adds the selection based on the node role, to the existing selection based on version and potential node attributes. Closes #30687 --- .../transport/RemoteClusterService.java | 25 ++- .../transport/RemoteClusterServiceTests.java | 185 +++++++++++++++++- 2 files changed, 199 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index fe001e5ad19ba..7673a02b2d03d 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -18,8 +18,6 @@ */ package org.elasticsearch.transport; -import org.elasticsearch.client.Client; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; @@ -27,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; @@ -36,6 +35,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; @@ -97,6 +97,9 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl Setting.affixKeySetting("search.remote.", "skip_unavailable", key -> boolSetting(key, false, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS); + private static final Predicate DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion()) + && (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode()); + private final TransportService transportService; private final int numRemoteConnections; private volatile Map remoteClusters = Collections.emptyMap(); @@ -121,13 +124,6 @@ private synchronized void updateRemoteClusters(Map> connectionListener.onResponse(null); } else { CountDown countDown = new CountDown(seeds.size()); - Predicate nodePredicate = (node) -> Version.CURRENT.isCompatible(node.getVersion()); - if (REMOTE_NODE_ATTRIBUTE.exists(settings)) { - // nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for - // cross cluster search - String attribute = REMOTE_NODE_ATTRIBUTE.get(settings); - nodePredicate = nodePredicate.and((node) -> Booleans.parseBoolean(node.getAttributes().getOrDefault(attribute, "false"))); - } remoteClusters.putAll(this.remoteClusters); for (Map.Entry> entry : seeds.entrySet()) { RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey()); @@ -143,7 +139,7 @@ private synchronized void updateRemoteClusters(Map> if (remote == null) { // this is a new cluster we have to add a new representation remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, numRemoteConnections, - nodePredicate); + getNodePredicate(settings)); remoteClusters.put(entry.getKey(), remote); } @@ -168,6 +164,15 @@ private synchronized void updateRemoteClusters(Map> this.remoteClusters = Collections.unmodifiableMap(remoteClusters); } + static Predicate getNodePredicate(Settings settings) { + if (REMOTE_NODE_ATTRIBUTE.exists(settings)) { + // nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for cross cluster search + String attribute = REMOTE_NODE_ATTRIBUTE.get(settings); + return DEFAULT_NODE_PREDICATE.and((node) -> Booleans.parseBoolean(node.getAttributes().getOrDefault(attribute, "false"))); + } + return DEFAULT_NODE_PREDICATE; + } + /** * Returns true if at least one remote cluster is configured */ diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index e221a2fb2077e..5529f98af3342 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.transport; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; @@ -30,7 +29,9 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -40,6 +41,7 @@ import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -50,6 +52,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.Predicate; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.instanceOf; @@ -279,6 +282,75 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { } } + public void testRemoteNodeRoles() throws IOException, InterruptedException { + final Settings settings = Settings.EMPTY; + final List knownNodes = new CopyOnWriteArrayList<>(); + final Settings data = Settings.builder().put("node.master", false).build(); + final Settings dedicatedMaster = Settings.builder().put("node.data", false).put("node.ingest", "false").build(); + try (MockTransportService c1N1 = + startTransport("cluster_1_node_1", knownNodes, Version.CURRENT, dedicatedMaster); + MockTransportService c1N2 = + startTransport("cluster_1_node_2", knownNodes, Version.CURRENT, data); + MockTransportService c2N1 = + startTransport("cluster_2_node_1", knownNodes, Version.CURRENT, dedicatedMaster); + MockTransportService c2N2 = + startTransport("cluster_2_node_2", knownNodes, Version.CURRENT, data)) { + final DiscoveryNode c1N1Node = c1N1.getLocalDiscoNode(); + final DiscoveryNode c1N2Node = c1N2.getLocalDiscoNode(); + final DiscoveryNode c2N1Node = c2N1.getLocalDiscoNode(); + final DiscoveryNode c2N2Node = c2N2.getLocalDiscoNode(); + knownNodes.add(c1N1Node); + knownNodes.add(c1N2Node); + knownNodes.add(c2N1Node); + knownNodes.add(c2N2Node); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService transportService = MockTransportService.createNewService( + settings, + Version.CURRENT, + threadPool, + null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + final Settings.Builder builder = Settings.builder(); + builder.putList("search.remote.cluster_1.seeds", c1N1Node.getAddress().toString()); + builder.putList("search.remote.cluster_2.seeds", c2N1Node.getAddress().toString()); + try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertFalse(service.isCrossClusterSearchEnabled()); + + final InetSocketAddress c1N1Address = c1N1Node.getAddress().address(); + final InetSocketAddress c1N2Address = c1N2Node.getAddress().address(); + final InetSocketAddress c2N1Address = c2N1Node.getAddress().address(); + final InetSocketAddress c2N2Address = c2N2Node.getAddress().address(); + + final CountDownLatch firstLatch = new CountDownLatch(1); + service.updateRemoteCluster( + "cluster_1", + Arrays.asList(c1N1Address, c1N2Address), + connectionListener(firstLatch)); + firstLatch.await(); + + final CountDownLatch secondLatch = new CountDownLatch(1); + service.updateRemoteCluster( + "cluster_2", + Arrays.asList(c2N1Address, c2N2Address), + connectionListener(secondLatch)); + secondLatch.await(); + + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + assertFalse(service.isRemoteNodeConnected("cluster_1", c1N1Node)); + assertTrue(service.isRemoteNodeConnected("cluster_1", c1N2Node)); + assertTrue(service.isRemoteClusterRegistered("cluster_2")); + assertFalse(service.isRemoteNodeConnected("cluster_2", c2N1Node)); + assertTrue(service.isRemoteNodeConnected("cluster_2", c2N2Node)); + } + } + } + } + private ActionListener connectionListener(final CountDownLatch latch) { return ActionListener.wrap(x -> latch.countDown(), x -> fail()); } @@ -630,4 +702,115 @@ public void testRemoteClusterSkipIfDisconnectedSetting() { } } } + + public void testGetNodePredicateNodeRoles() { + TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0); + Predicate nodePredicate = RemoteClusterService.getNodePredicate(Settings.EMPTY); + { + DiscoveryNode all = new DiscoveryNode("id", address, Collections.emptyMap(), + new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class)), Version.CURRENT); + assertTrue(nodePredicate.test(all)); + } + { + DiscoveryNode dataMaster = new DiscoveryNode("id", address, Collections.emptyMap(), + new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA, DiscoveryNode.Role.MASTER)), Version.CURRENT); + assertTrue(nodePredicate.test(dataMaster)); + } + { + DiscoveryNode dedicatedMaster = new DiscoveryNode("id", address, Collections.emptyMap(), + new HashSet<>(EnumSet.of(DiscoveryNode.Role.MASTER)), Version.CURRENT); + assertFalse(nodePredicate.test(dedicatedMaster)); + } + { + DiscoveryNode dedicatedIngest = new DiscoveryNode("id", address, Collections.emptyMap(), + new HashSet<>(EnumSet.of(DiscoveryNode.Role.INGEST)), Version.CURRENT); + assertTrue(nodePredicate.test(dedicatedIngest)); + } + { + DiscoveryNode masterIngest = new DiscoveryNode("id", address, Collections.emptyMap(), + new HashSet<>(EnumSet.of(DiscoveryNode.Role.INGEST, DiscoveryNode.Role.MASTER)), Version.CURRENT); + assertTrue(nodePredicate.test(masterIngest)); + } + { + DiscoveryNode dedicatedData = new DiscoveryNode("id", address, Collections.emptyMap(), + new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA)), Version.CURRENT); + assertTrue(nodePredicate.test(dedicatedData)); + } + { + DiscoveryNode ingestData = new DiscoveryNode("id", address, Collections.emptyMap(), + new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA, DiscoveryNode.Role.INGEST)), Version.CURRENT); + assertTrue(nodePredicate.test(ingestData)); + } + { + DiscoveryNode coordOnly = new DiscoveryNode("id", address, Collections.emptyMap(), + new HashSet<>(EnumSet.noneOf(DiscoveryNode.Role.class)), Version.CURRENT); + assertTrue(nodePredicate.test(coordOnly)); + } + } + + public void testGetNodePredicateNodeVersion() { + TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0); + Set roles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class)); + Predicate nodePredicate = RemoteClusterService.getNodePredicate(Settings.EMPTY); + Version version = VersionUtils.randomVersion(random()); + DiscoveryNode node = new DiscoveryNode("id", address, Collections.emptyMap(), roles, version); + assertThat(nodePredicate.test(node), equalTo(Version.CURRENT.isCompatible(version))); + } + + public void testGetNodePredicateNodeAttrs() { + TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0); + Set roles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class)); + Settings settings = Settings.builder().put("search.remote.node.attr", "gateway").build(); + Predicate nodePredicate = RemoteClusterService.getNodePredicate(settings); + { + DiscoveryNode nonGatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"), + roles, Version.CURRENT); + assertFalse(nodePredicate.test(nonGatewayNode)); + assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(nonGatewayNode)); + } + { + DiscoveryNode gatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"), + roles, Version.CURRENT); + assertTrue(nodePredicate.test(gatewayNode)); + assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(gatewayNode)); + } + { + DiscoveryNode noAttrNode = new DiscoveryNode("id", address, Collections.emptyMap(), roles, Version.CURRENT); + assertFalse(nodePredicate.test(noAttrNode)); + assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(noAttrNode)); + } + } + + public void testGetNodePredicatesCombination() { + TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0); + Settings settings = Settings.builder().put("search.remote.node.attr", "gateway").build(); + Predicate nodePredicate = RemoteClusterService.getNodePredicate(settings); + Set allRoles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class)); + Set dedicatedMasterRoles = new HashSet<>(EnumSet.of(DiscoveryNode.Role.MASTER)); + { + DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"), + dedicatedMasterRoles, Version.CURRENT); + assertFalse(nodePredicate.test(node)); + } + { + DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"), + dedicatedMasterRoles, Version.CURRENT); + assertFalse(nodePredicate.test(node)); + } + { + DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"), + dedicatedMasterRoles, Version.CURRENT); + assertFalse(nodePredicate.test(node)); + } + { + DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"), + allRoles, Version.CURRENT); + assertTrue(nodePredicate.test(node)); + } + { + DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"), + allRoles, Version.V_5_3_0); + assertFalse(nodePredicate.test(node)); + } + } }