Skip to content

Commit 16692e1

Browse files
committed
Put CCR tasks on (data && remote cluster clients) (#54146)
Today we assign CCR persistent tasks to nodes with the data role. It could be that the data node is not capable of connecting to remote clusters, in which case the task will fail since it can not connect to the remote cluster with the leader shard. Instead, we need to assign such tasks to nodes that are capable of connecting to remote clusters. This commit addresses this by enabling such persistent tasks to only be assigned to nodes that have the data role, and also have the remote cluster client role.
1 parent b55facd commit 16692e1

File tree

2 files changed

+124
-0
lines changed

2 files changed

+124
-0
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

+18
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.cluster.metadata.AliasMetaData;
3232
import org.elasticsearch.cluster.metadata.IndexMetaData;
3333
import org.elasticsearch.cluster.metadata.MappingMetaData;
34+
import org.elasticsearch.cluster.node.DiscoveryNode;
3435
import org.elasticsearch.cluster.routing.IndexRoutingTable;
3536
import org.elasticsearch.cluster.service.ClusterService;
3637
import org.elasticsearch.common.CheckedConsumer;
@@ -54,6 +55,7 @@
5455
import org.elasticsearch.persistent.AllocatedPersistentTask;
5556
import org.elasticsearch.persistent.PersistentTaskState;
5657
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
58+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
5759
import org.elasticsearch.persistent.PersistentTasksExecutor;
5860
import org.elasticsearch.tasks.TaskId;
5961
import org.elasticsearch.threadpool.Scheduler;
@@ -76,6 +78,7 @@
7678
import java.util.function.Consumer;
7779
import java.util.function.LongConsumer;
7880
import java.util.function.LongSupplier;
81+
import java.util.function.Predicate;
7982
import java.util.function.Supplier;
8083

8184
import static org.elasticsearch.xpack.ccr.CcrLicenseChecker.wrapClient;
@@ -115,6 +118,21 @@ public void validate(ShardFollowTask params, ClusterState clusterState) {
115118
}
116119
}
117120

121+
private static final Assignment NO_ASSIGNMENT = new Assignment(null, "no nodes found with data and remote cluster client roles");
122+
123+
@Override
124+
public Assignment getAssignment(final ShardFollowTask params, final ClusterState clusterState) {
125+
final DiscoveryNode node = selectLeastLoadedNode(
126+
clusterState,
127+
((Predicate<DiscoveryNode>) DiscoveryNode::isDataNode).and(DiscoveryNode::isRemoteClusterClient)
128+
);
129+
if (node == null) {
130+
return NO_ASSIGNMENT;
131+
} else {
132+
return new Assignment(node.getId(), "node is the least loaded data node and remote cluster client");
133+
}
134+
}
135+
118136
@Override
119137
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
120138
PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> taskInProgress,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr.action;
8+
9+
import org.elasticsearch.Version;
10+
import org.elasticsearch.client.Client;
11+
import org.elasticsearch.cluster.ClusterName;
12+
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.node.DiscoveryNode;
14+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
15+
import org.elasticsearch.cluster.node.DiscoveryNodes;
16+
import org.elasticsearch.cluster.service.ClusterService;
17+
import org.elasticsearch.common.UUIDs;
18+
import org.elasticsearch.common.settings.ClusterSettings;
19+
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.settings.SettingsModule;
21+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
22+
import org.elasticsearch.test.ESTestCase;
23+
import org.elasticsearch.threadpool.ThreadPool;
24+
import org.elasticsearch.xpack.ccr.CcrSettings;
25+
26+
import java.util.Arrays;
27+
import java.util.Collections;
28+
import java.util.HashSet;
29+
import java.util.Set;
30+
import java.util.function.BiConsumer;
31+
import java.util.function.Supplier;
32+
33+
import static org.hamcrest.Matchers.equalTo;
34+
import static org.mockito.Mockito.mock;
35+
import static org.mockito.Mockito.when;
36+
37+
public class ShardFollowTasksExecutorAssignmentTests extends ESTestCase {
38+
39+
public void testAssignmentToNodeWithDataAndRemoteClusterClientRoles() {
40+
runAssignmentTest(
41+
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)),
42+
randomIntBetween(0, 8),
43+
() -> new HashSet<>(randomSubsetOf(new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE)))),
44+
(theSpecial, assignment) -> {
45+
assertTrue(assignment.isAssigned());
46+
assertThat(assignment.getExecutorNode(), equalTo(theSpecial.getId()));
47+
}
48+
);
49+
}
50+
51+
public void testDataRoleWithoutRemoteClusterServiceRole() {
52+
runNoAssignmentTest(Collections.singleton(DiscoveryNodeRole.DATA_ROLE));
53+
}
54+
55+
public void testRemoteClusterClientRoleWithoutDataRole() {
56+
runNoAssignmentTest(Collections.singleton(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE));
57+
}
58+
59+
private void runNoAssignmentTest(final Set<DiscoveryNodeRole> roles) {
60+
runAssignmentTest(
61+
roles,
62+
0,
63+
Collections::emptySet,
64+
(theSpecial, assignment) -> {
65+
assertFalse(assignment.isAssigned());
66+
assertThat(assignment.getExplanation(), equalTo("no nodes found with data and remote cluster client roles"));
67+
}
68+
);
69+
}
70+
71+
private void runAssignmentTest(
72+
final Set<DiscoveryNodeRole> theSpecialRoles,
73+
final int numberOfOtherNodes,
74+
final Supplier<Set<DiscoveryNodeRole>> otherNodesRolesSupplier,
75+
final BiConsumer<DiscoveryNode, Assignment> consumer
76+
) {
77+
final ClusterService clusterService = mock(ClusterService.class);
78+
when(clusterService.getClusterSettings())
79+
.thenReturn(new ClusterSettings(Settings.EMPTY, Collections.singleton(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT)));
80+
final SettingsModule settingsModule = mock(SettingsModule.class);
81+
when(settingsModule.getSettings()).thenReturn(Settings.EMPTY);
82+
final ShardFollowTasksExecutor executor =
83+
new ShardFollowTasksExecutor(mock(Client.class), mock(ThreadPool.class), clusterService, settingsModule);
84+
final ClusterState.Builder clusterStateBuilder = ClusterState.builder(new ClusterName("test"));
85+
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
86+
final DiscoveryNode theSpecial = newNode(theSpecialRoles);
87+
nodesBuilder.add(theSpecial);
88+
for (int i = 0; i < numberOfOtherNodes; i++) {
89+
nodesBuilder.add(newNode(otherNodesRolesSupplier.get()));
90+
}
91+
clusterStateBuilder.nodes(nodesBuilder);
92+
final Assignment assignment = executor.getAssignment(mock(ShardFollowTask.class), clusterStateBuilder.build());
93+
consumer.accept(theSpecial, assignment);
94+
}
95+
96+
private static DiscoveryNode newNode(final Set<DiscoveryNodeRole> roles) {
97+
return new DiscoveryNode(
98+
"node_" + UUIDs.randomBase64UUID(random()),
99+
buildNewFakeTransportAddress(),
100+
Collections.emptyMap(),
101+
roles,
102+
Version.CURRENT
103+
);
104+
}
105+
106+
}

0 commit comments

Comments
 (0)