Skip to content

Commit af797a7

Browse files
committed
Auto-expand indices according to allocation filtering rules (#48974)
Honours allocation filtering rules when auto-expanding indices.
1 parent ca4f55f commit af797a7

File tree

7 files changed

+123
-30
lines changed

7 files changed

+123
-30
lines changed

docs/reference/index-modules.asciidoc

+6-5
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,12 @@ specific index module:
9696
Auto-expand the number of replicas based on the number of data nodes in the cluster.
9797
Set to a dash delimited lower and upper bound (e.g. `0-5`) or use `all`
9898
for the upper bound (e.g. `0-all`). Defaults to `false` (i.e. disabled).
99-
Note that the auto-expanded number of replicas does not take any other allocation
100-
rules into account, such as <<allocation-awareness,shard allocation awareness>>,
101-
<<shard-allocation-filtering,filtering>> or <<allocation-total-shards,total shards per node>>,
102-
and this can lead to the cluster health becoming `YELLOW` if the applicable rules
103-
prevent all the replicas from being allocated.
99+
Note that the auto-expanded number of replicas only takes
100+
<<shard-allocation-filtering,allocation filtering>> rules into account, but ignores
101+
any other allocation rules such as <<allocation-awareness,shard allocation awareness>>
102+
and <<allocation-total-shards,total shards per node>>, and this can lead to the
103+
cluster health becoming `YELLOW` if the applicable rules prevent all the replicas
104+
from being allocated.
104105

105106
`index.search.idle.after`::
106107
How long a shard can not receive a search or get request until it's considered

server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java

+17-9
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
*/
1919
package org.elasticsearch.cluster.metadata;
2020

21-
import org.elasticsearch.cluster.node.DiscoveryNodes;
21+
import com.carrotsearch.hppc.cursors.ObjectCursor;
22+
import org.elasticsearch.cluster.node.DiscoveryNode;
23+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
24+
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
2225
import org.elasticsearch.common.Booleans;
2326
import org.elasticsearch.common.settings.Setting;
2427
import org.elasticsearch.common.settings.Setting.Property;
@@ -99,11 +102,19 @@ int getMaxReplicas(int numDataNodes) {
99102
return Math.min(maxReplicas, numDataNodes-1);
100103
}
101104

102-
private OptionalInt getDesiredNumberOfReplicas(int numDataNodes) {
105+
private OptionalInt getDesiredNumberOfReplicas(IndexMetaData indexMetaData, RoutingAllocation allocation) {
103106
if (enabled) {
107+
int numMatchingDataNodes = 0;
108+
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().getDataNodes().values()) {
109+
Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetaData, cursor.value, allocation);
110+
if (decision.type() != Decision.Type.NO) {
111+
numMatchingDataNodes ++;
112+
}
113+
}
114+
104115
final int min = getMinReplicas();
105-
final int max = getMaxReplicas(numDataNodes);
106-
int numberOfReplicas = numDataNodes - 1;
116+
final int max = getMaxReplicas(numMatchingDataNodes);
117+
int numberOfReplicas = numMatchingDataNodes - 1;
107118
if (numberOfReplicas < min) {
108119
numberOfReplicas = min;
109120
} else if (numberOfReplicas > max) {
@@ -128,16 +139,13 @@ public String toString() {
128139
* The map has the desired number of replicas as key and the indices to update as value, as this allows the result
129140
* of this method to be directly applied to RoutingTable.Builder#updateNumberOfReplicas.
130141
*/
131-
public static Map<Integer, List<String>> getAutoExpandReplicaChanges(MetaData metaData, DiscoveryNodes discoveryNodes) {
132-
// used for translating "all" to a number
133-
final int dataNodeCount = discoveryNodes.getDataNodes().size();
134-
142+
public static Map<Integer, List<String>> getAutoExpandReplicaChanges(MetaData metaData, RoutingAllocation allocation) {
135143
Map<Integer, List<String>> nrReplicasChanged = new HashMap<>();
136144

137145
for (final IndexMetaData indexMetaData : metaData) {
138146
if (indexMetaData.getState() == IndexMetaData.State.OPEN || isIndexVerifiedBeforeClosed(indexMetaData)) {
139147
AutoExpandReplicas autoExpandReplicas = SETTING.get(indexMetaData.getSettings());
140-
autoExpandReplicas.getDesiredNumberOfReplicas(dataNodeCount).ifPresent(numberOfReplicas -> {
148+
autoExpandReplicas.getDesiredNumberOfReplicas(indexMetaData, allocation).ifPresent(numberOfReplicas -> {
141149
if (numberOfReplicas != indexMetaData.getNumberOfReplicas()) {
142150
nrReplicasChanged.computeIfAbsent(numberOfReplicas, ArrayList::new).add(indexMetaData.getIndex().getName());
143151
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,10 @@ public ClusterState disassociateDeadNodes(ClusterState clusterState, boolean rer
253253
* Returns an updated cluster state if changes were necessary, or the identical cluster if no changes were required.
254254
*/
255255
public ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
256+
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, clusterState.getRoutingNodes(), clusterState,
257+
clusterInfoService.getClusterInfo(), currentNanoTime());
256258
final Map<Integer, List<String>> autoExpandReplicaChanges =
257-
AutoExpandReplicas.getAutoExpandReplicaChanges(clusterState.metaData(), clusterState.nodes());
259+
AutoExpandReplicas.getAutoExpandReplicaChanges(clusterState.metaData(), allocation);
258260
if (autoExpandReplicaChanges.isEmpty()) {
259261
return clusterState;
260262
} else {
@@ -278,7 +280,7 @@ public ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
278280
}
279281
final ClusterState fixedState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build())
280282
.metaData(metaDataBuilder).build();
281-
assert AutoExpandReplicas.getAutoExpandReplicaChanges(fixedState.metaData(), fixedState.nodes()).isEmpty();
283+
assert AutoExpandReplicas.getAutoExpandReplicaChanges(fixedState.metaData(), allocation).isEmpty();
282284
return fixedState;
283285
}
284286
}
@@ -403,7 +405,7 @@ private boolean hasDeadNodes(RoutingAllocation allocation) {
403405

404406
private void reroute(RoutingAllocation allocation) {
405407
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
406-
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() :
408+
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation).isEmpty() :
407409
"auto-expand replicas out of sync with number of nodes in the cluster";
408410

409411
removeDelayMarkers(allocation);

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java

+9
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.cluster.routing.allocation.decider;
2121

2222
import org.elasticsearch.cluster.metadata.IndexMetaData;
23+
import org.elasticsearch.cluster.node.DiscoveryNode;
2324
import org.elasticsearch.cluster.routing.RoutingNode;
2425
import org.elasticsearch.cluster.routing.ShardRouting;
2526
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@@ -80,6 +81,14 @@ public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
8081
return Decision.ALWAYS;
8182
}
8283

84+
/**
85+
* Returns a {@link Decision} whether shards of the given index should be auto-expanded to this node at this state of the
86+
* {@link RoutingAllocation}. The default is {@link Decision#ALWAYS}.
87+
*/
88+
public Decision shouldAutoExpandToNode(IndexMetaData indexMetaData, DiscoveryNode node, RoutingAllocation allocation) {
89+
return Decision.ALWAYS;
90+
}
91+
8392
/**
8493
* Returns a {@link Decision} whether the cluster can execute
8594
* re-balanced operations at all.

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java

+21
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
2424
import org.elasticsearch.cluster.metadata.IndexMetaData;
25+
import org.elasticsearch.cluster.node.DiscoveryNode;
2526
import org.elasticsearch.cluster.routing.RoutingNode;
2627
import org.elasticsearch.cluster.routing.ShardRouting;
2728
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@@ -145,6 +146,26 @@ public Decision canAllocate(IndexMetaData indexMetaData, RoutingNode node, Routi
145146
return ret;
146147
}
147148

149+
@Override
150+
public Decision shouldAutoExpandToNode(IndexMetaData indexMetaData, DiscoveryNode node, RoutingAllocation allocation) {
151+
Decision.Multi ret = new Decision.Multi();
152+
for (AllocationDecider allocationDecider : allocations) {
153+
Decision decision = allocationDecider.shouldAutoExpandToNode(indexMetaData, node, allocation);
154+
// short track if a NO is returned.
155+
if (decision == Decision.NO) {
156+
if (!allocation.debugDecision()) {
157+
return decision;
158+
} else {
159+
ret.add(decision);
160+
}
161+
} else if (decision != Decision.ALWAYS
162+
&& (allocation.getDebugMode() != EXCLUDE_YES_DECISIONS || decision.type() != Decision.Type.YES)) {
163+
ret.add(decision);
164+
}
165+
}
166+
return ret;
167+
}
168+
148169
@Override
149170
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
150171
Decision.Multi ret = new Decision.Multi();

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java

+25-13
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.cluster.routing.allocation.decider;
2121

2222
import org.elasticsearch.cluster.metadata.IndexMetaData;
23+
import org.elasticsearch.cluster.node.DiscoveryNode;
2324
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
2425
import org.elasticsearch.cluster.routing.RecoverySource;
2526
import org.elasticsearch.cluster.routing.RoutingNode;
@@ -109,20 +110,31 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
109110
return allocation.decision(Decision.NO, NAME, explanation, initialRecoveryFilters);
110111
}
111112
}
112-
return shouldFilter(shardRouting, node, allocation);
113+
return shouldFilter(shardRouting, node.node(), allocation);
113114
}
114115

115116
@Override
116117
public Decision canAllocate(IndexMetaData indexMetaData, RoutingNode node, RoutingAllocation allocation) {
117-
return shouldFilter(indexMetaData, node, allocation);
118+
return shouldFilter(indexMetaData, node.node(), allocation);
118119
}
119120

120121
@Override
121122
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
122-
return shouldFilter(shardRouting, node, allocation);
123+
return shouldFilter(shardRouting, node.node(), allocation);
123124
}
124125

125-
private Decision shouldFilter(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
126+
@Override
127+
public Decision shouldAutoExpandToNode(IndexMetaData indexMetaData, DiscoveryNode node, RoutingAllocation allocation) {
128+
Decision decision = shouldClusterFilter(node, allocation);
129+
if (decision != null) return decision;
130+
131+
decision = shouldIndexFilter(indexMetaData, node, allocation);
132+
if (decision != null) return decision;
133+
134+
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
135+
}
136+
137+
private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) {
126138
Decision decision = shouldClusterFilter(node, allocation);
127139
if (decision != null) return decision;
128140

@@ -132,7 +144,7 @@ private Decision shouldFilter(ShardRouting shardRouting, RoutingNode node, Routi
132144
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
133145
}
134146

135-
private Decision shouldFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) {
147+
private Decision shouldFilter(IndexMetaData indexMd, DiscoveryNode node, RoutingAllocation allocation) {
136148
Decision decision = shouldClusterFilter(node, allocation);
137149
if (decision != null) return decision;
138150

@@ -142,43 +154,43 @@ private Decision shouldFilter(IndexMetaData indexMd, RoutingNode node, RoutingAl
142154
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
143155
}
144156

145-
private Decision shouldIndexFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) {
157+
private Decision shouldIndexFilter(IndexMetaData indexMd, DiscoveryNode node, RoutingAllocation allocation) {
146158
if (indexMd.requireFilters() != null) {
147-
if (indexMd.requireFilters().match(node.node()) == false) {
159+
if (indexMd.requireFilters().match(node) == false) {
148160
return allocation.decision(Decision.NO, NAME, "node does not match index setting [%s] filters [%s]",
149161
IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_PREFIX, indexMd.requireFilters());
150162
}
151163
}
152164
if (indexMd.includeFilters() != null) {
153-
if (indexMd.includeFilters().match(node.node()) == false) {
165+
if (indexMd.includeFilters().match(node) == false) {
154166
return allocation.decision(Decision.NO, NAME, "node does not match index setting [%s] filters [%s]",
155167
IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_PREFIX, indexMd.includeFilters());
156168
}
157169
}
158170
if (indexMd.excludeFilters() != null) {
159-
if (indexMd.excludeFilters().match(node.node())) {
171+
if (indexMd.excludeFilters().match(node)) {
160172
return allocation.decision(Decision.NO, NAME, "node matches index setting [%s] filters [%s]",
161173
IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(), indexMd.excludeFilters());
162174
}
163175
}
164176
return null;
165177
}
166178

167-
private Decision shouldClusterFilter(RoutingNode node, RoutingAllocation allocation) {
179+
private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation allocation) {
168180
if (clusterRequireFilters != null) {
169-
if (clusterRequireFilters.match(node.node()) == false) {
181+
if (clusterRequireFilters.match(node) == false) {
170182
return allocation.decision(Decision.NO, NAME, "node does not match cluster setting [%s] filters [%s]",
171183
CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX, clusterRequireFilters);
172184
}
173185
}
174186
if (clusterIncludeFilters != null) {
175-
if (clusterIncludeFilters.match(node.node()) == false) {
187+
if (clusterIncludeFilters.match(node) == false) {
176188
return allocation.decision(Decision.NO, NAME, "node does not cluster setting [%s] filters [%s]",
177189
CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX, clusterIncludeFilters);
178190
}
179191
}
180192
if (clusterExcludeFilters != null) {
181-
if (clusterExcludeFilters.match(node.node())) {
193+
if (clusterExcludeFilters.match(node)) {
182194
return allocation.decision(Decision.NO, NAME, "node matches cluster setting [%s] filters [%s]",
183195
CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX, clusterExcludeFilters);
184196
}

server/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java

+40
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.cluster.ClusterState;
2323
import org.elasticsearch.cluster.health.ClusterHealthStatus;
24+
import org.elasticsearch.cluster.metadata.AutoExpandReplicas;
2425
import org.elasticsearch.cluster.metadata.IndexMetaData;
2526
import org.elasticsearch.cluster.routing.IndexRoutingTable;
2627
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@@ -97,6 +98,45 @@ public void testDecommissionNodeNoReplicas() {
9798
.execute().actionGet().getHits().getTotalHits().value, equalTo(100L));
9899
}
99100

101+
public void testAutoExpandReplicasToFilteredNodes() {
102+
logger.info("--> starting 2 nodes");
103+
List<String> nodesIds = internalCluster().startNodes(2);
104+
final String node_0 = nodesIds.get(0);
105+
final String node_1 = nodesIds.get(1);
106+
assertThat(cluster().size(), equalTo(2));
107+
108+
logger.info("--> creating an index with auto-expand replicas");
109+
createIndex("test", Settings.builder()
110+
.put(AutoExpandReplicas.SETTING.getKey(), "0-all")
111+
.build());
112+
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
113+
assertThat(clusterState.metaData().index("test").getNumberOfReplicas(), equalTo(1));
114+
ensureGreen("test");
115+
116+
logger.info("--> filter out the second node");
117+
if (randomBoolean()) {
118+
client().admin().cluster().prepareUpdateSettings()
119+
.setTransientSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", node_1))
120+
.execute().actionGet();
121+
} else {
122+
client().admin().indices().prepareUpdateSettings("test")
123+
.setSettings(Settings.builder().put("index.routing.allocation.exclude._name", node_1))
124+
.execute().actionGet();
125+
}
126+
ensureGreen("test");
127+
128+
logger.info("--> verify all are allocated on node1 now");
129+
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
130+
assertThat(clusterState.metaData().index("test").getNumberOfReplicas(), equalTo(0));
131+
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
132+
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
133+
for (ShardRouting shardRouting : indexShardRoutingTable) {
134+
assertThat(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(node_0));
135+
}
136+
}
137+
}
138+
}
139+
100140
public void testDisablingAllocationFiltering() {
101141
logger.info("--> starting 2 nodes");
102142
List<String> nodesIds = internalCluster().startNodes(2);

0 commit comments

Comments
 (0)