Skip to content

Commit 3c21e46

Browse files
authored
Cross Cluster Search: do not use dedicated masters as gateways (#30926)
When we are connecting to a remote cluster we should never select dedicated master nodes as gateway nodes, or we will end up loading them with requests that should rather go to other type of nodes e.g. data nodes or coord_only nodes. This commit adds the selection based on the node role, to the existing selection based on version and potential node attributes. Closes #30687
1 parent 6341d10 commit 3c21e46

File tree

2 files changed

+199
-11
lines changed

2 files changed

+199
-11
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,14 @@
1818
*/
1919
package org.elasticsearch.transport;
2020

21-
import org.elasticsearch.client.Client;
22-
import org.elasticsearch.core.internal.io.IOUtils;
2321
import org.elasticsearch.Version;
2422
import org.elasticsearch.action.ActionListener;
2523
import org.elasticsearch.action.OriginalIndices;
2624
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
2725
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
2826
import org.elasticsearch.action.support.IndicesOptions;
2927
import org.elasticsearch.action.support.PlainActionFuture;
28+
import org.elasticsearch.client.Client;
3029
import org.elasticsearch.cluster.node.DiscoveryNode;
3130
import org.elasticsearch.common.Booleans;
3231
import org.elasticsearch.common.Strings;
@@ -36,6 +35,7 @@
3635
import org.elasticsearch.common.transport.TransportAddress;
3736
import org.elasticsearch.common.unit.TimeValue;
3837
import org.elasticsearch.common.util.concurrent.CountDown;
38+
import org.elasticsearch.core.internal.io.IOUtils;
3939
import org.elasticsearch.threadpool.ThreadPool;
4040

4141
import java.io.Closeable;
@@ -97,6 +97,9 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
9797
Setting.affixKeySetting("search.remote.", "skip_unavailable",
9898
key -> boolSetting(key, false, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS);
9999

100+
private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
101+
&& (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode());
102+
100103
private final TransportService transportService;
101104
private final int numRemoteConnections;
102105
private volatile Map<String, RemoteClusterConnection> remoteClusters = Collections.emptyMap();
@@ -121,13 +124,6 @@ private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>>
121124
connectionListener.onResponse(null);
122125
} else {
123126
CountDown countDown = new CountDown(seeds.size());
124-
Predicate<DiscoveryNode> nodePredicate = (node) -> Version.CURRENT.isCompatible(node.getVersion());
125-
if (REMOTE_NODE_ATTRIBUTE.exists(settings)) {
126-
// nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for
127-
// cross cluster search
128-
String attribute = REMOTE_NODE_ATTRIBUTE.get(settings);
129-
nodePredicate = nodePredicate.and((node) -> Booleans.parseBoolean(node.getAttributes().getOrDefault(attribute, "false")));
130-
}
131127
remoteClusters.putAll(this.remoteClusters);
132128
for (Map.Entry<String, List<DiscoveryNode>> entry : seeds.entrySet()) {
133129
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
@@ -143,7 +139,7 @@ private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>>
143139

144140
if (remote == null) { // this is a new cluster we have to add a new representation
145141
remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, numRemoteConnections,
146-
nodePredicate);
142+
getNodePredicate(settings));
147143
remoteClusters.put(entry.getKey(), remote);
148144
}
149145

@@ -168,6 +164,15 @@ private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>>
168164
this.remoteClusters = Collections.unmodifiableMap(remoteClusters);
169165
}
170166

167+
static Predicate<DiscoveryNode> getNodePredicate(Settings settings) {
168+
if (REMOTE_NODE_ATTRIBUTE.exists(settings)) {
169+
// nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for cross cluster search
170+
String attribute = REMOTE_NODE_ATTRIBUTE.get(settings);
171+
return DEFAULT_NODE_PREDICATE.and((node) -> Booleans.parseBoolean(node.getAttributes().getOrDefault(attribute, "false")));
172+
}
173+
return DEFAULT_NODE_PREDICATE;
174+
}
175+
171176
/**
172177
* Returns <code>true</code> if at least one remote cluster is configured
173178
*/

server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

Lines changed: 184 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.elasticsearch.transport;
2020

21-
import org.elasticsearch.core.internal.io.IOUtils;
2221
import org.elasticsearch.Version;
2322
import org.elasticsearch.action.ActionListener;
2423
import org.elasticsearch.action.LatchedActionListener;
@@ -30,7 +29,9 @@
3029
import org.elasticsearch.common.settings.ClusterSettings;
3130
import org.elasticsearch.common.settings.Settings;
3231
import org.elasticsearch.common.transport.TransportAddress;
32+
import org.elasticsearch.core.internal.io.IOUtils;
3333
import org.elasticsearch.test.ESTestCase;
34+
import org.elasticsearch.test.VersionUtils;
3435
import org.elasticsearch.test.transport.MockTransportService;
3536
import org.elasticsearch.threadpool.TestThreadPool;
3637
import org.elasticsearch.threadpool.ThreadPool;
@@ -40,6 +41,7 @@
4041
import java.net.InetSocketAddress;
4142
import java.util.Arrays;
4243
import java.util.Collections;
44+
import java.util.EnumSet;
4345
import java.util.HashMap;
4446
import java.util.HashSet;
4547
import java.util.List;
@@ -50,6 +52,7 @@
5052
import java.util.concurrent.TimeUnit;
5153
import java.util.concurrent.atomic.AtomicReference;
5254
import java.util.function.BiFunction;
55+
import java.util.function.Predicate;
5356

5457
import static org.hamcrest.CoreMatchers.containsString;
5558
import static org.hamcrest.CoreMatchers.instanceOf;
@@ -279,6 +282,75 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException {
279282
}
280283
}
281284

285+
public void testRemoteNodeRoles() throws IOException, InterruptedException {
286+
final Settings settings = Settings.EMPTY;
287+
final List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
288+
final Settings data = Settings.builder().put("node.master", false).build();
289+
final Settings dedicatedMaster = Settings.builder().put("node.data", false).put("node.ingest", "false").build();
290+
try (MockTransportService c1N1 =
291+
startTransport("cluster_1_node_1", knownNodes, Version.CURRENT, dedicatedMaster);
292+
MockTransportService c1N2 =
293+
startTransport("cluster_1_node_2", knownNodes, Version.CURRENT, data);
294+
MockTransportService c2N1 =
295+
startTransport("cluster_2_node_1", knownNodes, Version.CURRENT, dedicatedMaster);
296+
MockTransportService c2N2 =
297+
startTransport("cluster_2_node_2", knownNodes, Version.CURRENT, data)) {
298+
final DiscoveryNode c1N1Node = c1N1.getLocalDiscoNode();
299+
final DiscoveryNode c1N2Node = c1N2.getLocalDiscoNode();
300+
final DiscoveryNode c2N1Node = c2N1.getLocalDiscoNode();
301+
final DiscoveryNode c2N2Node = c2N2.getLocalDiscoNode();
302+
knownNodes.add(c1N1Node);
303+
knownNodes.add(c1N2Node);
304+
knownNodes.add(c2N1Node);
305+
knownNodes.add(c2N2Node);
306+
Collections.shuffle(knownNodes, random());
307+
308+
try (MockTransportService transportService = MockTransportService.createNewService(
309+
settings,
310+
Version.CURRENT,
311+
threadPool,
312+
null)) {
313+
transportService.start();
314+
transportService.acceptIncomingRequests();
315+
final Settings.Builder builder = Settings.builder();
316+
builder.putList("search.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
317+
builder.putList("search.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
318+
try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
319+
assertFalse(service.isCrossClusterSearchEnabled());
320+
service.initializeRemoteClusters();
321+
assertFalse(service.isCrossClusterSearchEnabled());
322+
323+
final InetSocketAddress c1N1Address = c1N1Node.getAddress().address();
324+
final InetSocketAddress c1N2Address = c1N2Node.getAddress().address();
325+
final InetSocketAddress c2N1Address = c2N1Node.getAddress().address();
326+
final InetSocketAddress c2N2Address = c2N2Node.getAddress().address();
327+
328+
final CountDownLatch firstLatch = new CountDownLatch(1);
329+
service.updateRemoteCluster(
330+
"cluster_1",
331+
Arrays.asList(c1N1Address, c1N2Address),
332+
connectionListener(firstLatch));
333+
firstLatch.await();
334+
335+
final CountDownLatch secondLatch = new CountDownLatch(1);
336+
service.updateRemoteCluster(
337+
"cluster_2",
338+
Arrays.asList(c2N1Address, c2N2Address),
339+
connectionListener(secondLatch));
340+
secondLatch.await();
341+
342+
assertTrue(service.isCrossClusterSearchEnabled());
343+
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
344+
assertFalse(service.isRemoteNodeConnected("cluster_1", c1N1Node));
345+
assertTrue(service.isRemoteNodeConnected("cluster_1", c1N2Node));
346+
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
347+
assertFalse(service.isRemoteNodeConnected("cluster_2", c2N1Node));
348+
assertTrue(service.isRemoteNodeConnected("cluster_2", c2N2Node));
349+
}
350+
}
351+
}
352+
}
353+
282354
private ActionListener<Void> connectionListener(final CountDownLatch latch) {
283355
return ActionListener.wrap(x -> latch.countDown(), x -> fail());
284356
}
@@ -630,4 +702,115 @@ public void testRemoteClusterSkipIfDisconnectedSetting() {
630702
}
631703
}
632704
}
705+
706+
public void testGetNodePredicateNodeRoles() {
707+
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
708+
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(Settings.EMPTY);
709+
{
710+
DiscoveryNode all = new DiscoveryNode("id", address, Collections.emptyMap(),
711+
new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class)), Version.CURRENT);
712+
assertTrue(nodePredicate.test(all));
713+
}
714+
{
715+
DiscoveryNode dataMaster = new DiscoveryNode("id", address, Collections.emptyMap(),
716+
new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA, DiscoveryNode.Role.MASTER)), Version.CURRENT);
717+
assertTrue(nodePredicate.test(dataMaster));
718+
}
719+
{
720+
DiscoveryNode dedicatedMaster = new DiscoveryNode("id", address, Collections.emptyMap(),
721+
new HashSet<>(EnumSet.of(DiscoveryNode.Role.MASTER)), Version.CURRENT);
722+
assertFalse(nodePredicate.test(dedicatedMaster));
723+
}
724+
{
725+
DiscoveryNode dedicatedIngest = new DiscoveryNode("id", address, Collections.emptyMap(),
726+
new HashSet<>(EnumSet.of(DiscoveryNode.Role.INGEST)), Version.CURRENT);
727+
assertTrue(nodePredicate.test(dedicatedIngest));
728+
}
729+
{
730+
DiscoveryNode masterIngest = new DiscoveryNode("id", address, Collections.emptyMap(),
731+
new HashSet<>(EnumSet.of(DiscoveryNode.Role.INGEST, DiscoveryNode.Role.MASTER)), Version.CURRENT);
732+
assertTrue(nodePredicate.test(masterIngest));
733+
}
734+
{
735+
DiscoveryNode dedicatedData = new DiscoveryNode("id", address, Collections.emptyMap(),
736+
new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA)), Version.CURRENT);
737+
assertTrue(nodePredicate.test(dedicatedData));
738+
}
739+
{
740+
DiscoveryNode ingestData = new DiscoveryNode("id", address, Collections.emptyMap(),
741+
new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA, DiscoveryNode.Role.INGEST)), Version.CURRENT);
742+
assertTrue(nodePredicate.test(ingestData));
743+
}
744+
{
745+
DiscoveryNode coordOnly = new DiscoveryNode("id", address, Collections.emptyMap(),
746+
new HashSet<>(EnumSet.noneOf(DiscoveryNode.Role.class)), Version.CURRENT);
747+
assertTrue(nodePredicate.test(coordOnly));
748+
}
749+
}
750+
751+
public void testGetNodePredicateNodeVersion() {
752+
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
753+
Set<DiscoveryNode.Role> roles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class));
754+
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(Settings.EMPTY);
755+
Version version = VersionUtils.randomVersion(random());
756+
DiscoveryNode node = new DiscoveryNode("id", address, Collections.emptyMap(), roles, version);
757+
assertThat(nodePredicate.test(node), equalTo(Version.CURRENT.isCompatible(version)));
758+
}
759+
760+
public void testGetNodePredicateNodeAttrs() {
761+
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
762+
Set<DiscoveryNode.Role> roles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class));
763+
Settings settings = Settings.builder().put("search.remote.node.attr", "gateway").build();
764+
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(settings);
765+
{
766+
DiscoveryNode nonGatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"),
767+
roles, Version.CURRENT);
768+
assertFalse(nodePredicate.test(nonGatewayNode));
769+
assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(nonGatewayNode));
770+
}
771+
{
772+
DiscoveryNode gatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"),
773+
roles, Version.CURRENT);
774+
assertTrue(nodePredicate.test(gatewayNode));
775+
assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(gatewayNode));
776+
}
777+
{
778+
DiscoveryNode noAttrNode = new DiscoveryNode("id", address, Collections.emptyMap(), roles, Version.CURRENT);
779+
assertFalse(nodePredicate.test(noAttrNode));
780+
assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(noAttrNode));
781+
}
782+
}
783+
784+
public void testGetNodePredicatesCombination() {
785+
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
786+
Settings settings = Settings.builder().put("search.remote.node.attr", "gateway").build();
787+
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(settings);
788+
Set<DiscoveryNode.Role> allRoles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class));
789+
Set<DiscoveryNode.Role> dedicatedMasterRoles = new HashSet<>(EnumSet.of(DiscoveryNode.Role.MASTER));
790+
{
791+
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"),
792+
dedicatedMasterRoles, Version.CURRENT);
793+
assertFalse(nodePredicate.test(node));
794+
}
795+
{
796+
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"),
797+
dedicatedMasterRoles, Version.CURRENT);
798+
assertFalse(nodePredicate.test(node));
799+
}
800+
{
801+
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"),
802+
dedicatedMasterRoles, Version.CURRENT);
803+
assertFalse(nodePredicate.test(node));
804+
}
805+
{
806+
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"),
807+
allRoles, Version.CURRENT);
808+
assertTrue(nodePredicate.test(node));
809+
}
810+
{
811+
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"),
812+
allRoles, Version.V_5_3_0);
813+
assertFalse(nodePredicate.test(node));
814+
}
815+
}
633816
}

0 commit comments

Comments
 (0)