Skip to content

Auto-expand indices according to allocation filtering rules #48974

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 15, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions docs/reference/index-modules.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,12 @@ specific index module:
Auto-expand the number of replicas based on the number of data nodes in the cluster.
Set to a dash delimited lower and upper bound (e.g. `0-5`) or use `all`
for the upper bound (e.g. `0-all`). Defaults to `false` (i.e. disabled).
Note that the auto-expanded number of replicas does not take any other allocation
rules into account, such as <<allocation-awareness,shard allocation awareness>>,
<<shard-allocation-filtering,filtering>> or <<allocation-total-shards,total shards per node>>,
and this can lead to the cluster health becoming `YELLOW` if the applicable rules
prevent all the replicas from being allocated.
Note that the auto-expanded number of replicas does only take
<<shard-allocation-filtering,allocation filtering>> rules into account, but ignores
any other allocation rules such as <<allocation-awareness,shard allocation awareness>>
and <<allocation-total-shards,total shards per node>>, and this can lead to the
cluster health becoming `YELLOW` if the applicable rules prevent all the replicas
from being allocated.

`index.search.idle.after`::
How long a shard can not receive a search or get request until it's considered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.cluster.node.DiscoveryNodes;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -99,11 +102,19 @@ int getMaxReplicas(int numDataNodes) {
return Math.min(maxReplicas, numDataNodes-1);
}

private OptionalInt getDesiredNumberOfReplicas(int numDataNodes) {
private OptionalInt getDesiredNumberOfReplicas(IndexMetaData indexMetaData, RoutingAllocation allocation) {
if (enabled) {
int numMatchingDataNodes = 0;
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().getDataNodes().values()) {
Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetaData, cursor.value, allocation);
if (decision.type() != Decision.Type.NO) {
numMatchingDataNodes ++;
}
}

final int min = getMinReplicas();
final int max = getMaxReplicas(numDataNodes);
int numberOfReplicas = numDataNodes - 1;
final int max = getMaxReplicas(numMatchingDataNodes);
int numberOfReplicas = numMatchingDataNodes - 1;
if (numberOfReplicas < min) {
numberOfReplicas = min;
} else if (numberOfReplicas > max) {
Expand All @@ -128,16 +139,13 @@ public String toString() {
* The map has the desired number of replicas as key and the indices to update as value, as this allows the result
* of this method to be directly applied to RoutingTable.Builder#updateNumberOfReplicas.
*/
public static Map<Integer, List<String>> getAutoExpandReplicaChanges(MetaData metaData, DiscoveryNodes discoveryNodes) {
// used for translating "all" to a number
final int dataNodeCount = discoveryNodes.getDataNodes().size();

public static Map<Integer, List<String>> getAutoExpandReplicaChanges(MetaData metaData, RoutingAllocation allocation) {
Map<Integer, List<String>> nrReplicasChanged = new HashMap<>();

for (final IndexMetaData indexMetaData : metaData) {
if (indexMetaData.getState() == IndexMetaData.State.OPEN || isIndexVerifiedBeforeClosed(indexMetaData)) {
AutoExpandReplicas autoExpandReplicas = SETTING.get(indexMetaData.getSettings());
autoExpandReplicas.getDesiredNumberOfReplicas(dataNodeCount).ifPresent(numberOfReplicas -> {
autoExpandReplicas.getDesiredNumberOfReplicas(indexMetaData, allocation).ifPresent(numberOfReplicas -> {
if (numberOfReplicas != indexMetaData.getNumberOfReplicas()) {
nrReplicasChanged.computeIfAbsent(numberOfReplicas, ArrayList::new).add(indexMetaData.getIndex().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,10 @@ public ClusterState disassociateDeadNodes(ClusterState clusterState, boolean rer
* Returns an updated cluster state if changes were necessary, or the identical cluster if no changes were required.
*/
public ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, clusterState.getRoutingNodes(), clusterState,
clusterInfoService.getClusterInfo(), currentNanoTime());
final Map<Integer, List<String>> autoExpandReplicaChanges =
AutoExpandReplicas.getAutoExpandReplicaChanges(clusterState.metaData(), clusterState.nodes());
AutoExpandReplicas.getAutoExpandReplicaChanges(clusterState.metaData(), allocation);
if (autoExpandReplicaChanges.isEmpty()) {
return clusterState;
} else {
Expand All @@ -279,7 +281,7 @@ public ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
}
final ClusterState fixedState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build())
.metaData(metaDataBuilder).build();
assert AutoExpandReplicas.getAutoExpandReplicaChanges(fixedState.metaData(), fixedState.nodes()).isEmpty();
assert AutoExpandReplicas.getAutoExpandReplicaChanges(fixedState.metaData(), allocation).isEmpty();
return fixedState;
}
}
Expand Down Expand Up @@ -408,7 +410,7 @@ private boolean hasDeadNodes(RoutingAllocation allocation) {

private void reroute(RoutingAllocation allocation) {
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() :
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation).isEmpty() :
"auto-expand replicas out of sync with number of nodes in the cluster";

removeDelayMarkers(allocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
Expand Down Expand Up @@ -80,6 +81,14 @@ public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}

/**
* Returns a {@link Decision} whether shards of the given index should be auto-expanded to this node at this state of the
* {@link RoutingAllocation}. The default is {@link Decision#ALWAYS}.
*/
public Decision shouldAutoExpandToNode(IndexMetaData indexMetaData, DiscoveryNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}

/**
* Returns a {@link Decision} whether the cluster can execute
* re-balanced operations at all.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
Expand Down Expand Up @@ -145,6 +146,26 @@ public Decision canAllocate(IndexMetaData indexMetaData, RoutingNode node, Routi
return ret;
}

@Override
public Decision shouldAutoExpandToNode(IndexMetaData indexMetaData, DiscoveryNode node, RoutingAllocation allocation) {
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.shouldAutoExpandToNode(indexMetaData, node, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS
&& (allocation.getDebugMode() != EXCLUDE_YES_DECISIONS || decision.type() != Decision.Type.YES)) {
ret.add(decision);
}
}
return ret;
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
Decision.Multi ret = new Decision.Multi();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
Expand Down Expand Up @@ -109,20 +110,31 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return allocation.decision(Decision.NO, NAME, explanation, initialRecoveryFilters);
}
}
return shouldFilter(shardRouting, node, allocation);
return shouldFilter(shardRouting, node.node(), allocation);
}

@Override
public Decision canAllocate(IndexMetaData indexMetaData, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(indexMetaData, node, allocation);
return shouldFilter(indexMetaData, node.node(), allocation);
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(shardRouting, node, allocation);
return shouldFilter(shardRouting, node.node(), allocation);
}

private Decision shouldFilter(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
@Override
public Decision shouldAutoExpandToNode(IndexMetaData indexMetaData, DiscoveryNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) return decision;

decision = shouldIndexFilter(indexMetaData, node, allocation);
if (decision != null) return decision;

return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
}

private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) return decision;

Expand All @@ -132,7 +144,7 @@ private Decision shouldFilter(ShardRouting shardRouting, RoutingNode node, Routi
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
}

private Decision shouldFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) {
private Decision shouldFilter(IndexMetaData indexMd, DiscoveryNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) return decision;

Expand All @@ -142,43 +154,43 @@ private Decision shouldFilter(IndexMetaData indexMd, RoutingNode node, RoutingAl
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
}

private Decision shouldIndexFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) {
private Decision shouldIndexFilter(IndexMetaData indexMd, DiscoveryNode node, RoutingAllocation allocation) {
if (indexMd.requireFilters() != null) {
if (indexMd.requireFilters().match(node.node()) == false) {
if (indexMd.requireFilters().match(node) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match index setting [%s] filters [%s]",
IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_PREFIX, indexMd.requireFilters());
}
}
if (indexMd.includeFilters() != null) {
if (indexMd.includeFilters().match(node.node()) == false) {
if (indexMd.includeFilters().match(node) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match index setting [%s] filters [%s]",
IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_PREFIX, indexMd.includeFilters());
}
}
if (indexMd.excludeFilters() != null) {
if (indexMd.excludeFilters().match(node.node())) {
if (indexMd.excludeFilters().match(node)) {
return allocation.decision(Decision.NO, NAME, "node matches index setting [%s] filters [%s]",
IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(), indexMd.excludeFilters());
}
}
return null;
}

private Decision shouldClusterFilter(RoutingNode node, RoutingAllocation allocation) {
private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation allocation) {
if (clusterRequireFilters != null) {
if (clusterRequireFilters.match(node.node()) == false) {
if (clusterRequireFilters.match(node) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match cluster setting [%s] filters [%s]",
CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX, clusterRequireFilters);
}
}
if (clusterIncludeFilters != null) {
if (clusterIncludeFilters.match(node.node()) == false) {
if (clusterIncludeFilters.match(node) == false) {
return allocation.decision(Decision.NO, NAME, "node does not cluster setting [%s] filters [%s]",
CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX, clusterIncludeFilters);
}
}
if (clusterExcludeFilters != null) {
if (clusterExcludeFilters.match(node.node())) {
if (clusterExcludeFilters.match(node)) {
return allocation.decision(Decision.NO, NAME, "node matches cluster setting [%s] filters [%s]",
CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX, clusterExcludeFilters);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.AutoExpandReplicas;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
Expand Down Expand Up @@ -97,6 +98,45 @@ public void testDecommissionNodeNoReplicas() {
.execute().actionGet().getHits().getTotalHits().value, equalTo(100L));
}

public void testAutoExpandReplicasToFilteredNodes() {
logger.info("--> starting 2 nodes");
List<String> nodesIds = internalCluster().startNodes(2);
final String node_0 = nodesIds.get(0);
final String node_1 = nodesIds.get(1);
assertThat(cluster().size(), equalTo(2));

logger.info("--> creating an index with auto-expand replicas");
createIndex("test", Settings.builder()
.put(AutoExpandReplicas.SETTING.getKey(), "0-all")
.build());
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(clusterState.metaData().index("test").getNumberOfReplicas(), equalTo(1));
ensureGreen("test");

logger.info("--> filter out the second node");
if (randomBoolean()) {
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", node_1))
.execute().actionGet();
} else {
client().admin().indices().prepareUpdateSettings("test")
.setSettings(Settings.builder().put("index.routing.allocation.exclude._name", node_1))
.execute().actionGet();
}
ensureGreen("test");

logger.info("--> verify all are allocated on node1 now");
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(clusterState.metaData().index("test").getNumberOfReplicas(), equalTo(0));
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
assertThat(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(node_0));
}
}
}
}

public void testDisablingAllocationFiltering() {
logger.info("--> starting 2 nodes");
List<String> nodesIds = internalCluster().startNodes(2);
Expand Down