Skip to content

Commit a51dda8

Browse files
dnhatnjasontedor
andauthored
Assign follower primary to nodes with remote cluster client role (#59375)
The primary shards of follower indices during the bootstrap need to be on nodes with the remote cluster client role as those nodes reach out to the corresponding leader shards on the remote cluster to copy Lucene segment files and renew the retention leases. This commit introduces a new allocation decider that ensures bootstrapping follower primaries are allocated to nodes with the remote cluster client role. Relates #54146 Relates #53924 Closes #58534 Co-authored-by: Jason Tedor <[email protected]>
1 parent 9f22634 commit a51dda8

File tree

6 files changed

+416
-2
lines changed

6 files changed

+416
-2
lines changed

x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java

-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ protected boolean reuseClusters() {
5151
return false;
5252
}
5353

54-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/58534")
5554
public void testFailOverOnFollower() throws Exception {
5655
final String leaderIndex = "leader_test_failover";
5756
final String followerIndex = "follower_test_failover";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr;
8+
9+
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation;
10+
import org.elasticsearch.action.support.ActiveShardCount;
11+
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.node.DiscoveryNode;
13+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
14+
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
15+
import org.elasticsearch.cluster.routing.ShardRouting;
16+
import org.elasticsearch.cluster.routing.allocation.AllocationDecision;
17+
import org.elasticsearch.cluster.routing.allocation.NodeAllocationResult;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.unit.TimeValue;
20+
import org.elasticsearch.common.xcontent.XContentType;
21+
import org.elasticsearch.test.NodeRoles;
22+
import org.elasticsearch.xpack.CcrIntegTestCase;
23+
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
24+
25+
import java.util.List;
26+
import java.util.Set;
27+
import java.util.stream.Collectors;
28+
import java.util.stream.Stream;
29+
30+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
31+
import static org.hamcrest.Matchers.equalTo;
32+
import static org.hamcrest.Matchers.in;
33+
34+
public class PrimaryFollowerAllocationIT extends CcrIntegTestCase {
35+
36+
@Override
37+
protected boolean reuseClusters() {
38+
return false;
39+
}
40+
41+
public void testDoNotAllocateFollowerPrimaryToNodesWithoutRemoteClusterClientRole() throws Exception {
42+
final String leaderIndex = "leader-not-allow-index";
43+
final String followerIndex = "follower-not-allow-index";
44+
final List<String> dataOnlyNodes = getFollowerCluster().startNodes(between(1, 2),
45+
NodeRoles.onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE)));
46+
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex)
47+
.setSource(getIndexSettings(between(1, 2), between(0, 1)), XContentType.JSON));
48+
final PutFollowAction.Request putFollowRequest = putFollow(leaderIndex, followerIndex);
49+
putFollowRequest.setSettings(Settings.builder()
50+
.put("index.routing.allocation.include._name", String.join(",", dataOnlyNodes))
51+
.build());
52+
putFollowRequest.waitForActiveShards(ActiveShardCount.ONE);
53+
putFollowRequest.timeout(TimeValue.timeValueSeconds(2));
54+
final PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, putFollowRequest).get();
55+
assertFalse(response.isFollowIndexShardsAcked());
56+
assertFalse(response.isIndexFollowingStarted());
57+
final ClusterAllocationExplanation explanation = followerClient().admin().cluster().prepareAllocationExplain()
58+
.setIndex(followerIndex).setShard(0).setPrimary(true).get().getExplanation();
59+
for (NodeAllocationResult nodeDecision : explanation.getShardAllocationDecision().getAllocateDecision().getNodeDecisions()) {
60+
assertThat(nodeDecision.getNodeDecision(), equalTo(AllocationDecision.NO));
61+
if (dataOnlyNodes.contains(nodeDecision.getNode().getName())) {
62+
final List<String> decisions = nodeDecision.getCanAllocateDecision().getDecisions()
63+
.stream().map(Object::toString).collect(Collectors.toList());
64+
assertThat("NO(shard is a primary follower and being bootstrapped, but node does not have the remote_cluster_client role)",
65+
in(decisions));
66+
}
67+
}
68+
}
69+
70+
public void testAllocateFollowerPrimaryToNodesWithRemoteClusterClientRole() throws Exception {
71+
final String leaderIndex = "leader-allow-index";
72+
final String followerIndex = "follower-allow-index";
73+
final List<String> dataOnlyNodes = getFollowerCluster().startNodes(between(2, 3),
74+
NodeRoles.onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE)));
75+
final List<String> dataAndRemoteNodes = getFollowerCluster().startNodes(between(1, 2),
76+
NodeRoles.onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)));
77+
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex)
78+
.setSource(getIndexSettings(between(1, 2), between(0, 1)), XContentType.JSON));
79+
final PutFollowAction.Request putFollowRequest = putFollow(leaderIndex, followerIndex);
80+
putFollowRequest.setSettings(Settings.builder()
81+
.put("index.routing.rebalance.enable", "none")
82+
.put("index.routing.allocation.include._name",
83+
Stream.concat(dataOnlyNodes.stream(), dataAndRemoteNodes.stream()).collect(Collectors.joining(",")))
84+
.build());
85+
final PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, putFollowRequest).get();
86+
assertTrue(response.isFollowIndexShardsAcked());
87+
assertTrue(response.isIndexFollowingStarted());
88+
ensureFollowerGreen(followerIndex);
89+
int numDocs = between(0, 20);
90+
for (int i = 0; i < numDocs; i++) {
91+
leaderClient().prepareIndex(leaderIndex).setSource("f", i).get();
92+
}
93+
// Empty follower primaries must be assigned to nodes with the remote cluster client role
94+
assertBusy(() -> {
95+
final ClusterState state = getFollowerCluster().client().admin().cluster().prepareState().get().getState();
96+
for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(followerIndex)) {
97+
final ShardRouting primaryShard = shardRoutingTable.primaryShard();
98+
assertTrue(primaryShard.assignedToNode());
99+
final DiscoveryNode assignedNode = state.nodes().get(primaryShard.currentNodeId());
100+
assertThat(assignedNode.getName(), in(dataAndRemoteNodes));
101+
}
102+
});
103+
// Follower primaries can be relocated to nodes without the remote cluster client role
104+
followerClient().admin().indices().prepareUpdateSettings(followerIndex)
105+
.setSettings(Settings.builder().put("index.routing.allocation.include._name", String.join(",", dataOnlyNodes)))
106+
.get();
107+
assertBusy(() -> {
108+
final ClusterState state = getFollowerCluster().client().admin().cluster().prepareState().get().getState();
109+
for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(followerIndex)) {
110+
for (ShardRouting shard : shardRoutingTable) {
111+
assertNotNull(shard.currentNodeId());
112+
final DiscoveryNode assignedNode = state.nodes().get(shard.currentNodeId());
113+
assertThat(assignedNode.getName(), in(dataOnlyNodes));
114+
}
115+
}
116+
});
117+
assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex);
118+
// Follower primaries can be recovered from the existing copies on nodes without the remote cluster client role
119+
getFollowerCluster().fullRestart();
120+
ensureFollowerGreen(followerIndex);
121+
assertBusy(() -> {
122+
final ClusterState state = getFollowerCluster().client().admin().cluster().prepareState().get().getState();
123+
for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(followerIndex)) {
124+
for (ShardRouting shard : shardRoutingTable) {
125+
assertNotNull(shard.currentNodeId());
126+
final DiscoveryNode assignedNode = state.nodes().get(shard.currentNodeId());
127+
assertThat(assignedNode.getName(), in(dataOnlyNodes));
128+
}
129+
}
130+
});
131+
int moreDocs = between(0, 20);
132+
for (int i = 0; i < moreDocs; i++) {
133+
leaderClient().prepareIndex(leaderIndex).setSource("f", i).get();
134+
}
135+
assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex);
136+
}
137+
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1717
import org.elasticsearch.cluster.metadata.Metadata;
1818
import org.elasticsearch.cluster.node.DiscoveryNodes;
19+
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
1920
import org.elasticsearch.cluster.service.ClusterService;
2021
import org.elasticsearch.common.ParseField;
2122
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -36,6 +37,7 @@
3637
import org.elasticsearch.persistent.PersistentTaskParams;
3738
import org.elasticsearch.persistent.PersistentTasksExecutor;
3839
import org.elasticsearch.plugins.ActionPlugin;
40+
import org.elasticsearch.plugins.ClusterPlugin;
3941
import org.elasticsearch.plugins.EnginePlugin;
4042
import org.elasticsearch.plugins.PersistentTaskPlugin;
4143
import org.elasticsearch.plugins.Plugin;
@@ -75,6 +77,7 @@
7577
import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction;
7678
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
7779
import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction;
80+
import org.elasticsearch.xpack.ccr.allocation.CcrPrimaryFollowerAllocationDecider;
7881
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
7982
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
8083
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
@@ -125,7 +128,7 @@
125128
/**
126129
* Container class for CCR functionality.
127130
*/
128-
public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin, RepositoryPlugin {
131+
public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin, RepositoryPlugin, ClusterPlugin {
129132

130133
public static final String CCR_THREAD_POOL_NAME = "ccr";
131134
public static final String CCR_CUSTOM_METADATA_KEY = "ccr";
@@ -370,4 +373,9 @@ public Collection<RequestValidators.RequestValidator<PutMappingRequest>> mapping
370373
public Collection<RequestValidators.RequestValidator<IndicesAliasesRequest>> indicesAliasesRequestValidators() {
371374
return Collections.singletonList(CcrRequests.CCR_INDICES_ALIASES_REQUEST_VALIDATOR);
372375
}
376+
377+
@Override
378+
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
379+
return List.of(new CcrPrimaryFollowerAllocationDecider());
380+
}
373381
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
/*
8+
* Licensed to Elasticsearch under one or more contributor
9+
* license agreements. See the NOTICE file distributed with
10+
* this work for additional information regarding copyright
11+
* ownership. Elasticsearch licenses this file to you under
12+
* the Apache License, Version 2.0 (the "License"); you may
13+
* not use this file except in compliance with the License.
14+
* You may obtain a copy of the License at
15+
*
16+
* http://www.apache.org/licenses/LICENSE-2.0
17+
*
18+
* Unless required by applicable law or agreed to in writing,
19+
* software distributed under the License is distributed on an
20+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
21+
* KIND, either express or implied. See the License for the
22+
* specific language governing permissions and limitations
23+
* under the License.
24+
*/
25+
26+
package org.elasticsearch.xpack.ccr.allocation;
27+
28+
import org.elasticsearch.cluster.metadata.IndexMetadata;
29+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
30+
import org.elasticsearch.cluster.routing.RecoverySource;
31+
import org.elasticsearch.cluster.routing.RoutingNode;
32+
import org.elasticsearch.cluster.routing.ShardRouting;
33+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
34+
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
35+
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
36+
import org.elasticsearch.xpack.ccr.CcrSettings;
37+
38+
/**
39+
* An allocation decider that ensures primary shards of follower indices that are being bootstrapped are assigned to nodes that have the
40+
* remote cluster client role. This is necessary as those nodes reach out to the leader shards on the remote cluster to copy Lucene segment
41+
* files and periodically renew retention leases during the bootstrap.
42+
*/
43+
public final class CcrPrimaryFollowerAllocationDecider extends AllocationDecider {
44+
static final String NAME = "ccr_primary_follower";
45+
46+
@Override
47+
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
48+
final IndexMetadata indexMetadata = allocation.metadata().index(shardRouting.index());
49+
if (CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(indexMetadata.getSettings()) == false) {
50+
return allocation.decision(Decision.YES, NAME, "shard is not a follower and is not under the purview of this decider");
51+
}
52+
if (shardRouting.primary() == false) {
53+
return allocation.decision(Decision.YES, NAME, "shard is a replica follower and is not under the purview of this decider");
54+
}
55+
final RecoverySource recoverySource = shardRouting.recoverySource();
56+
if (recoverySource == null || recoverySource.getType() != RecoverySource.Type.SNAPSHOT) {
57+
return allocation.decision(Decision.YES, NAME,
58+
"shard is a primary follower but was bootstrapped already; hence is not under the purview of this decider");
59+
}
60+
if (node.node().isRemoteClusterClient() == false) {
61+
return allocation.decision(Decision.NO, NAME, "shard is a primary follower and being bootstrapped, but node does not have the "
62+
+ DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
63+
}
64+
return allocation.decision(Decision.YES, NAME,
65+
"shard is a primary follower and node has the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
66+
}
67+
}

0 commit comments

Comments
 (0)