Skip to content

Commit 4f805de

Browse files
committed
Only auto-expand replicas with allocation filtering when all nodes upgraded (#50361)
Follow-up to #48974 that ensures that replicas are only auto-expanded according to allocation filtering rules once all nodes are upgraded to a version that supports this. Helps with orchestrating cluster upgrades.
1 parent 40bce49 commit 4f805de

File tree

3 files changed

+106
-7
lines changed

3 files changed

+106
-7
lines changed

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

+35
Original file line numberDiff line numberDiff line change
@@ -774,4 +774,39 @@ public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
774774
assertEmptyTranslog(index);
775775
}
776776
}
777+
778+
public void testAutoExpandIndicesDuringRollingUpgrade() throws Exception {
779+
final String indexName = "test-auto-expand-filtering";
780+
final Version minimumNodeVersion = minimumNodeVersion();
781+
782+
Response response = client().performRequest(new Request("GET", "_nodes"));
783+
ObjectPath objectPath = ObjectPath.createFromResponse(response);
784+
final Map<String, Object> nodeMap = objectPath.evaluate("nodes");
785+
List<String> nodes = new ArrayList<>(nodeMap.keySet());
786+
787+
if (CLUSTER_TYPE == ClusterType.OLD) {
788+
createIndex(indexName, Settings.builder()
789+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
790+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
791+
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-all")
792+
.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._id", nodes.get(randomInt(2)))
793+
.build());
794+
}
795+
796+
ensureGreen(indexName);
797+
798+
final int numberOfReplicas = Integer.parseInt(
799+
getIndexSettingsAsMap(indexName).get(IndexMetaData.SETTING_NUMBER_OF_REPLICAS).toString());
800+
if (minimumNodeVersion.onOrAfter(Version.V_7_6_0)) {
801+
assertEquals(nodes.size() - 2, numberOfReplicas);
802+
} else {
803+
assertEquals(nodes.size() - 1, numberOfReplicas);
804+
}
805+
}
806+
807+
@SuppressWarnings("unchecked")
808+
private Map<String, Object> getIndexSettingsAsMap(String index) throws IOException {
809+
Map<String, Object> indexSettings = getIndexSettings(index);
810+
return (Map<String, Object>)((Map<String, Object>) indexSettings.get(index)).get("settings");
811+
}
777812
}

server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.elasticsearch.cluster.metadata;
2020

2121
import com.carrotsearch.hppc.cursors.ObjectCursor;
22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.cluster.node.DiscoveryNode;
2324
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
2425
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
@@ -105,11 +106,16 @@ int getMaxReplicas(int numDataNodes) {
105106
private OptionalInt getDesiredNumberOfReplicas(IndexMetaData indexMetaData, RoutingAllocation allocation) {
106107
if (enabled) {
107108
int numMatchingDataNodes = 0;
108-
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().getDataNodes().values()) {
109-
Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetaData, cursor.value, allocation);
110-
if (decision.type() != Decision.Type.NO) {
111-
numMatchingDataNodes ++;
109+
// Only start using new logic once all nodes are migrated to 7.6.0, avoiding disruption during an upgrade
110+
if (allocation.nodes().getMinNodeVersion().onOrAfter(Version.V_7_6_0)) {
111+
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().getDataNodes().values()) {
112+
Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetaData, cursor.value, allocation);
113+
if (decision.type() != Decision.Type.NO) {
114+
numMatchingDataNodes ++;
115+
}
112116
}
117+
} else {
118+
numMatchingDataNodes = allocation.nodes().getDataNodes().size();
113119
}
114120

115121
final int min = getMinReplicas();

server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java

+61-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.Version;
2222
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
2323
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
24+
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
2425
import org.elasticsearch.action.support.ActiveShardCount;
2526
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
2627
import org.elasticsearch.cluster.ClusterState;
@@ -32,6 +33,7 @@
3233
import org.elasticsearch.common.settings.Settings;
3334
import org.elasticsearch.indices.cluster.ClusterStateChanges;
3435
import org.elasticsearch.test.ESTestCase;
36+
import org.elasticsearch.test.VersionUtils;
3537
import org.elasticsearch.threadpool.TestThreadPool;
3638
import org.elasticsearch.threadpool.ThreadPool;
3739

@@ -46,6 +48,7 @@
4648

4749
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
4850
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
51+
import static org.hamcrest.Matchers.equalTo;
4952
import static org.hamcrest.Matchers.everyItem;
5053
import static org.hamcrest.Matchers.isIn;
5154

@@ -104,12 +107,15 @@ public void testInvalidValues() {
104107

105108
private static final AtomicInteger nodeIdGenerator = new AtomicInteger();
106109

107-
protected DiscoveryNode createNode(DiscoveryNodeRole... mustHaveRoles) {
110+
protected DiscoveryNode createNode(Version version, DiscoveryNodeRole... mustHaveRoles) {
108111
Set<DiscoveryNodeRole> roles = new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES));
109112
Collections.addAll(roles, mustHaveRoles);
110113
final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet());
111-
return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles,
112-
Version.CURRENT);
114+
return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, version);
115+
}
116+
117+
protected DiscoveryNode createNode(DiscoveryNodeRole... mustHaveRoles) {
118+
return createNode(Version.CURRENT, mustHaveRoles);
113119
}
114120

115121
/**
@@ -200,4 +206,56 @@ public void testAutoExpandWhenNodeLeavesAndPossiblyRejoins() throws InterruptedE
200206
terminate(threadPool);
201207
}
202208
}
209+
210+
public void testOnlyAutoExpandAllocationFilteringAfterAllNodesUpgraded() {
211+
final ThreadPool threadPool = new TestThreadPool(getClass().getName());
212+
final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool);
213+
214+
try {
215+
List<DiscoveryNode> allNodes = new ArrayList<>();
216+
DiscoveryNode oldNode = createNode(VersionUtils.randomVersionBetween(random(), Version.V_7_0_0, Version.V_7_5_1),
217+
DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE); // local node is the master
218+
allNodes.add(oldNode);
219+
ClusterState state = ClusterStateCreationUtils.state(oldNode, oldNode, allNodes.toArray(new DiscoveryNode[0]));
220+
221+
CreateIndexRequest request = new CreateIndexRequest("index",
222+
Settings.builder()
223+
.put(SETTING_NUMBER_OF_SHARDS, 1)
224+
.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all").build())
225+
.waitForActiveShards(ActiveShardCount.NONE);
226+
state = cluster.createIndex(state, request);
227+
assertTrue(state.metaData().hasIndex("index"));
228+
while (state.routingTable().index("index").shard(0).allShardsStarted() == false) {
229+
logger.info(state);
230+
state = cluster.applyStartedShards(state,
231+
state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING));
232+
state = cluster.reroute(state, new ClusterRerouteRequest());
233+
}
234+
235+
DiscoveryNode newNode = createNode(Version.V_7_6_0,
236+
DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE); // local node is the master
237+
238+
state = cluster.addNodes(state, Collections.singletonList(newNode));
239+
240+
// use allocation filtering
241+
state = cluster.updateSettings(state, new UpdateSettingsRequest("index").settings(Settings.builder()
242+
.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", oldNode.getName()).build()));
243+
244+
while (state.routingTable().index("index").shard(0).allShardsStarted() == false) {
245+
logger.info(state);
246+
state = cluster.applyStartedShards(state,
247+
state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING));
248+
state = cluster.reroute(state, new ClusterRerouteRequest());
249+
}
250+
251+
// check that presence of old node means that auto-expansion does not take allocation filtering into account
252+
assertThat(state.routingTable().index("index").shard(0).size(), equalTo(2));
253+
254+
// remove old node and check that auto-expansion takes allocation filtering into account
255+
state = cluster.removeNodes(state, Collections.singletonList(oldNode));
256+
assertThat(state.routingTable().index("index").shard(0).size(), equalTo(1));
257+
} finally {
258+
terminate(threadPool);
259+
}
260+
}
203261
}

0 commit comments

Comments
 (0)