diff --git a/docs/reference/index-modules.asciidoc b/docs/reference/index-modules.asciidoc index 11babf3e58afe..8ad469682fbf1 100644 --- a/docs/reference/index-modules.asciidoc +++ b/docs/reference/index-modules.asciidoc @@ -96,11 +96,12 @@ specific index module: Auto-expand the number of replicas based on the number of data nodes in the cluster. Set to a dash delimited lower and upper bound (e.g. `0-5`) or use `all` for the upper bound (e.g. `0-all`). Defaults to `false` (i.e. disabled). - Note that the auto-expanded number of replicas does not take any other allocation - rules into account, such as <>, - <> or <>, - and this can lead to the cluster health becoming `YELLOW` if the applicable rules - prevent all the replicas from being allocated. + Note that the auto-expanded number of replicas only takes + <> rules into account, but ignores + any other allocation rules such as <> + and <>, and this can lead to the + cluster health becoming `YELLOW` if the applicable rules prevent all the replicas + from being allocated. `index.search.idle.after`:: How long a shard can not receive a search or get request until it's considered diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java b/server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java index 16c48137be7da..346d755c37916 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java @@ -18,7 +18,10 @@ */ package org.elasticsearch.cluster.metadata; -import org.elasticsearch.cluster.node.DiscoveryNodes; +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -99,11 +102,19 @@ int getMaxReplicas(int numDataNodes) { return Math.min(maxReplicas, numDataNodes-1); } - private OptionalInt getDesiredNumberOfReplicas(int numDataNodes) { + private OptionalInt getDesiredNumberOfReplicas(IndexMetaData indexMetaData, RoutingAllocation allocation) { if (enabled) { + int numMatchingDataNodes = 0; + for (ObjectCursor cursor : allocation.nodes().getDataNodes().values()) { + Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetaData, cursor.value, allocation); + if (decision.type() != Decision.Type.NO) { + numMatchingDataNodes ++; + } + } + final int min = getMinReplicas(); - final int max = getMaxReplicas(numDataNodes); - int numberOfReplicas = numDataNodes - 1; + final int max = getMaxReplicas(numMatchingDataNodes); + int numberOfReplicas = numMatchingDataNodes - 1; if (numberOfReplicas < min) { numberOfReplicas = min; } else if (numberOfReplicas > max) { @@ -128,16 +139,13 @@ public String toString() { * The map has the desired number of replicas as key and the indices to update as value, as this allows the result * of this method to be directly applied to RoutingTable.Builder#updateNumberOfReplicas. */ - public static Map> getAutoExpandReplicaChanges(MetaData metaData, DiscoveryNodes discoveryNodes) { - // used for translating "all" to a number - final int dataNodeCount = discoveryNodes.getDataNodes().size(); - + public static Map> getAutoExpandReplicaChanges(MetaData metaData, RoutingAllocation allocation) { Map> nrReplicasChanged = new HashMap<>(); for (final IndexMetaData indexMetaData : metaData) { if (indexMetaData.getState() == IndexMetaData.State.OPEN || isIndexVerifiedBeforeClosed(indexMetaData)) { AutoExpandReplicas autoExpandReplicas = SETTING.get(indexMetaData.getSettings()); - autoExpandReplicas.getDesiredNumberOfReplicas(dataNodeCount).ifPresent(numberOfReplicas -> { + autoExpandReplicas.getDesiredNumberOfReplicas(indexMetaData, allocation).ifPresent(numberOfReplicas -> { if (numberOfReplicas != indexMetaData.getNumberOfReplicas()) { nrReplicasChanged.computeIfAbsent(numberOfReplicas, ArrayList::new).add(indexMetaData.getIndex().getName()); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index b7c288b703ed1..cf693f0c60a0e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -254,8 +254,10 @@ public ClusterState disassociateDeadNodes(ClusterState clusterState, boolean rer * Returns an updated cluster state if changes were necessary, or the identical cluster if no changes were required. */ public ClusterState adaptAutoExpandReplicas(ClusterState clusterState) { + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, clusterState.getRoutingNodes(), clusterState, + clusterInfoService.getClusterInfo(), currentNanoTime()); final Map> autoExpandReplicaChanges = - AutoExpandReplicas.getAutoExpandReplicaChanges(clusterState.metaData(), clusterState.nodes()); + AutoExpandReplicas.getAutoExpandReplicaChanges(clusterState.metaData(), allocation); if (autoExpandReplicaChanges.isEmpty()) { return clusterState; } else { @@ -279,7 +281,7 @@ public ClusterState adaptAutoExpandReplicas(ClusterState clusterState) { } final ClusterState fixedState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()) .metaData(metaDataBuilder).build(); - assert AutoExpandReplicas.getAutoExpandReplicaChanges(fixedState.metaData(), fixedState.nodes()).isEmpty(); + assert AutoExpandReplicas.getAutoExpandReplicaChanges(fixedState.metaData(), allocation).isEmpty(); return fixedState; } } @@ -408,7 +410,7 @@ private boolean hasDeadNodes(RoutingAllocation allocation) { private void reroute(RoutingAllocation allocation) { assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes"; - assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() : + assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation).isEmpty() : "auto-expand replicas out of sync with number of nodes in the cluster"; removeDelayMarkers(allocation); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java index bd51b7d47b335..34901628eff7e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -80,6 +81,14 @@ public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) { return Decision.ALWAYS; } + /** + * Returns a {@link Decision} whether shards of the given index should be auto-expanded to this node at this state of the + * {@link RoutingAllocation}. The default is {@link Decision#ALWAYS}. + */ + public Decision shouldAutoExpandToNode(IndexMetaData indexMetaData, DiscoveryNode node, RoutingAllocation allocation) { + return Decision.ALWAYS; + } + /** * Returns a {@link Decision} whether the cluster can execute * re-balanced operations at all. diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java index 5ab234c7e892d..f7ec43b5197ea 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -145,6 +146,26 @@ public Decision canAllocate(IndexMetaData indexMetaData, RoutingNode node, Routi return ret; } + @Override + public Decision shouldAutoExpandToNode(IndexMetaData indexMetaData, DiscoveryNode node, RoutingAllocation allocation) { + Decision.Multi ret = new Decision.Multi(); + for (AllocationDecider allocationDecider : allocations) { + Decision decision = allocationDecider.shouldAutoExpandToNode(indexMetaData, node, allocation); + // short track if a NO is returned. + if (decision == Decision.NO) { + if (!allocation.debugDecision()) { + return decision; + } else { + ret.add(decision); + } + } else if (decision != Decision.ALWAYS + && (allocation.getDebugMode() != EXCLUDE_YES_DECISIONS || decision.type() != Decision.Type.YES)) { + ret.add(decision); + } + } + return ret; + } + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) { Decision.Multi ret = new Decision.Multi(); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java index bd7ee7804ccec..19c5c517038df 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeFilters; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingNode; @@ -109,20 +110,31 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return allocation.decision(Decision.NO, NAME, explanation, initialRecoveryFilters); } } - return shouldFilter(shardRouting, node, allocation); + return shouldFilter(shardRouting, node.node(), allocation); } @Override public Decision canAllocate(IndexMetaData indexMetaData, RoutingNode node, RoutingAllocation allocation) { - return shouldFilter(indexMetaData, node, allocation); + return shouldFilter(indexMetaData, node.node(), allocation); } @Override public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - return shouldFilter(shardRouting, node, allocation); + return shouldFilter(shardRouting, node.node(), allocation); } - private Decision shouldFilter(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + @Override + public Decision shouldAutoExpandToNode(IndexMetaData indexMetaData, DiscoveryNode node, RoutingAllocation allocation) { + Decision decision = shouldClusterFilter(node, allocation); + if (decision != null) return decision; + + decision = shouldIndexFilter(indexMetaData, node, allocation); + if (decision != null) return decision; + + return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters"); + } + + private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) { Decision decision = shouldClusterFilter(node, allocation); if (decision != null) return decision; @@ -132,7 +144,7 @@ private Decision shouldFilter(ShardRouting shardRouting, RoutingNode node, Routi return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters"); } - private Decision shouldFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) { + private Decision shouldFilter(IndexMetaData indexMd, DiscoveryNode node, RoutingAllocation allocation) { Decision decision = shouldClusterFilter(node, allocation); if (decision != null) return decision; @@ -142,21 +154,21 @@ private Decision shouldFilter(IndexMetaData indexMd, RoutingNode node, RoutingAl return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters"); } - private Decision shouldIndexFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) { + private Decision shouldIndexFilter(IndexMetaData indexMd, DiscoveryNode node, RoutingAllocation allocation) { if (indexMd.requireFilters() != null) { - if (indexMd.requireFilters().match(node.node()) == false) { + if (indexMd.requireFilters().match(node) == false) { return allocation.decision(Decision.NO, NAME, "node does not match index setting [%s] filters [%s]", IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_PREFIX, indexMd.requireFilters()); } } if (indexMd.includeFilters() != null) { - if (indexMd.includeFilters().match(node.node()) == false) { + if (indexMd.includeFilters().match(node) == false) { return allocation.decision(Decision.NO, NAME, "node does not match index setting [%s] filters [%s]", IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_PREFIX, indexMd.includeFilters()); } } if (indexMd.excludeFilters() != null) { - if (indexMd.excludeFilters().match(node.node())) { + if (indexMd.excludeFilters().match(node)) { return allocation.decision(Decision.NO, NAME, "node matches index setting [%s] filters [%s]", IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(), indexMd.excludeFilters()); } @@ -164,21 +176,21 @@ private Decision shouldIndexFilter(IndexMetaData indexMd, RoutingNode node, Rout return null; } - private Decision shouldClusterFilter(RoutingNode node, RoutingAllocation allocation) { + private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation allocation) { if (clusterRequireFilters != null) { - if (clusterRequireFilters.match(node.node()) == false) { + if (clusterRequireFilters.match(node) == false) { return allocation.decision(Decision.NO, NAME, "node does not match cluster setting [%s] filters [%s]", CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX, clusterRequireFilters); } } if (clusterIncludeFilters != null) { - if (clusterIncludeFilters.match(node.node()) == false) { + if (clusterIncludeFilters.match(node) == false) { return allocation.decision(Decision.NO, NAME, "node does not cluster setting [%s] filters [%s]", CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX, clusterIncludeFilters); } } if (clusterExcludeFilters != null) { - if (clusterExcludeFilters.match(node.node())) { + if (clusterExcludeFilters.match(node)) { return allocation.decision(Decision.NO, NAME, "node matches cluster setting [%s] filters [%s]", CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX, clusterExcludeFilters); } diff --git a/server/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java index 0d9db409be111..ae27d444b45c9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.AutoExpandReplicas; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -97,6 +98,45 @@ public void testDecommissionNodeNoReplicas() { .execute().actionGet().getHits().getTotalHits().value, equalTo(100L)); } + public void testAutoExpandReplicasToFilteredNodes() { + logger.info("--> starting 2 nodes"); + List nodesIds = internalCluster().startNodes(2); + final String node_0 = nodesIds.get(0); + final String node_1 = nodesIds.get(1); + assertThat(cluster().size(), equalTo(2)); + + logger.info("--> creating an index with auto-expand replicas"); + createIndex("test", Settings.builder() + .put(AutoExpandReplicas.SETTING.getKey(), "0-all") + .build()); + ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + assertThat(clusterState.metaData().index("test").getNumberOfReplicas(), equalTo(1)); + ensureGreen("test"); + + logger.info("--> filter out the second node"); + if (randomBoolean()) { + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", node_1)) + .execute().actionGet(); + } else { + client().admin().indices().prepareUpdateSettings("test") + .setSettings(Settings.builder().put("index.routing.allocation.exclude._name", node_1)) + .execute().actionGet(); + } + ensureGreen("test"); + + logger.info("--> verify all are allocated on node1 now"); + clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + assertThat(clusterState.metaData().index("test").getNumberOfReplicas(), equalTo(0)); + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + assertThat(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(node_0)); + } + } + } + } + public void testDisablingAllocationFiltering() { logger.info("--> starting 2 nodes"); List nodesIds = internalCluster().startNodes(2);