diff --git a/docs/reference/modules/cluster/misc.asciidoc b/docs/reference/modules/cluster/misc.asciidoc index 65afd0e65cd0b..a447b49c6058e 100644 --- a/docs/reference/modules/cluster/misc.asciidoc +++ b/docs/reference/modules/cluster/misc.asciidoc @@ -45,11 +45,13 @@ either the limit is increased as described below, or some indices are <> or <> to bring the number of shards below the limit. -The cluster shard limit defaults to 1,000 shards per data node. -Both primary and replica shards of all open indices count toward the limit, -including unassigned shards. -For example, an open index with 5 primary shards and 2 replicas counts as 15 shards. -Closed indices do not contribute to the shard count. +The cluster shard limit defaults to 1,000 shards per non-frozen data node for +normal (non-frozen) indices and 3000 shards per frozen data node for frozen +indices. +Both primary and replica shards of all open indices count toward the limit, +including unassigned shards. +For example, an open index with 5 primary shards and 2 replicas counts as 15 shards. +Closed indices do not contribute to the shard count. You can dynamically adjust the cluster shard limit with the following setting: @@ -61,7 +63,7 @@ You can dynamically adjust the cluster shard limit with the following setting: Limits the total number of primary and replica shards for the cluster. {es} calculates the limit as follows: -`cluster.max_shards_per_node * number of data nodes` +`cluster.max_shards_per_node * number of non-frozen data nodes` Shards for closed indices do not count toward this limit. Defaults to `1000`. A cluster with no data nodes is unlimited. @@ -71,7 +73,29 @@ example, a cluster with a `cluster.max_shards_per_node` setting of `100` and three data nodes has a shard limit of 300. If the cluster already contains 296 shards, {es} rejects any request that adds five or more shards to the cluster. -NOTE: This setting does not limit shards for individual nodes. To limit the +Notice that frozen shards have their own independent limit. +-- + +[[cluster-max-shards-per-node-frozen]] +`cluster.max_shards_per_node.frozen`:: ++ +-- +(<>) +Limits the total number of primary and replica frozen shards for the cluster. +{es} calculates the limit as follows: + +`cluster.max_shards_per_node * number of frozen data nodes` + +Shards for closed indices do not count toward this limit. Defaults to `3000`. +A cluster with no frozen data nodes is unlimited. + +{es} rejects any request that creates more frozen shards than this limit allows. +For example, a cluster with a `cluster.max_shards_per_node.frozen` setting of +`100` and three frozen data nodes has a frozen shard limit of 300. If the +cluster already contains 296 shards, {es} rejects any request that adds five or +more frozen shards to the cluster. + +NOTE: These setting do not limit shards for individual nodes. To limit the number of shards for each node, use the <> setting. diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/shards/ClusterShardLimitIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/shards/ClusterShardLimitIT.java index 407c732fab02a..980f99068c4d6 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/shards/ClusterShardLimitIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/shards/ClusterShardLimitIT.java @@ -145,8 +145,8 @@ public void testIncreaseReplicasOverLimit() { fail("shouldn't be able to increase the number of replicas"); } catch (IllegalArgumentException e) { String expectedError = "Validation Failed: 1: this action would add [" + (dataNodes * firstShardCount) - + "] total shards, but this cluster currently has [" + firstShardCount + "]/[" + dataNodes * shardsPerNode - + "] maximum shards open;"; + + "] shards, but this cluster currently has [" + firstShardCount + "]/[" + dataNodes * shardsPerNode + + "] maximum normal shards open;"; assertEquals(expectedError, e.getMessage()); } Metadata clusterState = client().admin().cluster().prepareState().get().getState().metadata(); @@ -192,8 +192,8 @@ public void testChangingMultipleIndicesOverLimit() { int difference = totalShardsAfter - totalShardsBefore; String expectedError = "Validation Failed: 1: this action would add [" + difference - + "] total shards, but this cluster currently has [" + totalShardsBefore + "]/[" + dataNodes * shardsPerNode - + "] maximum shards open;"; + + "] shards, but this cluster currently has [" + totalShardsBefore + "]/[" + dataNodes * shardsPerNode + + "] maximum normal shards open;"; assertEquals(expectedError, e.getMessage()); } Metadata clusterState = client().admin().cluster().prepareState().get().getState().metadata(); @@ -352,7 +352,7 @@ private void verifyException(int dataNodes, ShardCounts counts, IllegalArgumentE int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas()); int maxShards = counts.getShardsPerNode() * dataNodes; String expectedError = "Validation Failed: 1: this action would add [" + totalShards - + "] total shards, but this cluster currently has [" + currentShards + "]/[" + maxShards + "] maximum shards open;"; + + "] shards, but this cluster currently has [" + currentShards + "]/[" + maxShards + "] maximum normal shards open;"; assertEquals(expectedError, e.getMessage()); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java index c6655b62e574f..2eab1d439a070 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -22,7 +22,6 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; -import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -38,7 +37,6 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Locale; -import java.util.Optional; import java.util.Set; import static org.elasticsearch.action.support.ContextPreservingActionListener.wrapPreservingContext; @@ -135,15 +133,7 @@ public ClusterState execute(ClusterState currentState) { final int updatedNumberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(openSettings); if (preserveExisting == false) { // Verify that this won't take us over the cluster shard limit. - int totalNewShards = Arrays.stream(request.indices()) - .mapToInt(i -> getTotalNewShards(i, currentState, updatedNumberOfReplicas)) - .sum(); - Optional error = shardLimitValidator.checkShardLimit(totalNewShards, currentState); - if (error.isPresent()) { - ValidationException ex = new ValidationException(); - ex.addValidationError(error.get()); - throw ex; - } + shardLimitValidator.validateShardLimitOnReplicaUpdate(currentState, request.indices(), updatedNumberOfReplicas); /* * We do not update the in-sync allocation IDs as they will be removed upon the first index operation which makes @@ -269,14 +259,6 @@ public ClusterState execute(ClusterState currentState) { }); } - private int getTotalNewShards(Index index, ClusterState currentState, int updatedNumberOfReplicas) { - IndexMetadata indexMetadata = currentState.metadata().index(index); - int shardsInIndex = indexMetadata.getNumberOfShards(); - int oldNumberOfReplicas = indexMetadata.getNumberOfReplicas(); - int replicaIncrease = updatedNumberOfReplicas - oldNumberOfReplicas; - return replicaIncrease * shardsInIndex; - } - /** * Updates the cluster block only iff the setting exists in the given settings */ diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 9f6efdf682871..a72ea9507dcdf 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -476,7 +476,8 @@ public void apply(Settings value, Settings current, Settings previous) { FsHealthService.ENABLED_SETTING, FsHealthService.REFRESH_INTERVAL_SETTING, FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING, - IndexingPressure.MAX_INDEXING_BYTES); + IndexingPressure.MAX_INDEXING_BYTES, + ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN); static List> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList(); diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index ce0b24d0ea990..9745ba80ddc6a 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -33,6 +33,7 @@ import org.elasticsearch.index.store.FsDirectoryFactory; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.IndicesRequestCache; +import org.elasticsearch.indices.ShardLimitValidator; import java.util.Collections; import java.util.Map; @@ -157,6 +158,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING, ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING, DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS, + ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP, // validate that built-in similarities don't get redefined Setting.groupSetting( diff --git a/server/src/main/java/org/elasticsearch/indices/ShardLimitValidator.java b/server/src/main/java/org/elasticsearch/indices/ShardLimitValidator.java index 12c178a89569b..fc874861e129a 100644 --- a/server/src/main/java/org/elasticsearch/indices/ShardLimitValidator.java +++ b/server/src/main/java/org/elasticsearch/indices/ShardLimitValidator.java @@ -10,15 +10,19 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; -import java.util.Arrays; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; @@ -33,17 +37,40 @@ public class ShardLimitValidator { public static final Setting SETTING_CLUSTER_MAX_SHARDS_PER_NODE = Setting.intSetting("cluster.max_shards_per_node", 1000, 1, Setting.Property.Dynamic, Setting.Property.NodeScope); + public static final Setting SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN = + Setting.intSetting("cluster.max_shards_per_node.frozen", 3000, 1, Setting.Property.Dynamic, Setting.Property.NodeScope); + public static final String FROZEN_GROUP = "frozen"; + static final Set VALID_GROUPS = Set.of("normal", FROZEN_GROUP); + public static final Setting INDEX_SETTING_SHARD_LIMIT_GROUP = + Setting.simpleString("index.shard_limit.group", "normal", + value -> { + if (VALID_GROUPS.contains(value) == false) { + throw new IllegalArgumentException("[" + value + "] is not a valid shard limit group"); + } + }, + Setting.Property.IndexScope, + Setting.Property.PrivateIndex, + Setting.Property.NotCopyableOnResize + ); protected final AtomicInteger shardLimitPerNode = new AtomicInteger(); + protected final AtomicInteger shardLimitPerNodeFrozen = new AtomicInteger(); public ShardLimitValidator(final Settings settings, ClusterService clusterService) { this.shardLimitPerNode.set(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(settings)); + this.shardLimitPerNodeFrozen.set(SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN.get(settings)); clusterService.getClusterSettings().addSettingsUpdateConsumer(SETTING_CLUSTER_MAX_SHARDS_PER_NODE, this::setShardLimitPerNode); + clusterService.getClusterSettings().addSettingsUpdateConsumer(SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN, + this::setShardLimitPerNodeFrozen); } private void setShardLimitPerNode(int newValue) { this.shardLimitPerNode.set(newValue); } + private void setShardLimitPerNodeFrozen(int newValue) { + this.shardLimitPerNodeFrozen.set(newValue); + } + /** * Gets the currently configured value of the {@link ShardLimitValidator#SETTING_CLUSTER_MAX_SHARDS_PER_NODE} setting. * @return the current value of the setting @@ -63,8 +90,9 @@ public void validateShardLimit(final Settings settings, final ClusterState state final int numberOfShards = INDEX_NUMBER_OF_SHARDS_SETTING.get(settings); final int numberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings); final int shardsToCreate = numberOfShards * (1 + numberOfReplicas); + final boolean frozen = FROZEN_GROUP.equals(INDEX_SETTING_SHARD_LIMIT_GROUP.get(settings)); - final Optional shardLimit = checkShardLimit(shardsToCreate, state); + final Optional shardLimit = checkShardLimit(frozen == false ? shardsToCreate : 0, frozen ? shardsToCreate : 0, state); if (shardLimit.isPresent()) { final ValidationException e = new ValidationException(); e.addValidationError(shardLimit.get()); @@ -81,12 +109,42 @@ public void validateShardLimit(final Settings settings, final ClusterState state * @throws ValidationException If this operation would take the cluster over the limit and enforcement is enabled. */ public void validateShardLimit(ClusterState currentState, Index[] indicesToOpen) { - int shardsToOpen = Arrays.stream(indicesToOpen) - .filter(index -> currentState.metadata().index(index).getState().equals(IndexMetadata.State.CLOSE)) - .mapToInt(index -> getTotalShardCount(currentState, index)) - .sum(); + int frozen = 0; + int normal = 0; + for (Index index : indicesToOpen) { + IndexMetadata imd = currentState.metadata().index(index); + if (imd.getState().equals(IndexMetadata.State.CLOSE)) { + int totalNewShards = imd.getNumberOfShards() * (1 + imd.getNumberOfReplicas()); + if (FROZEN_GROUP.equals(INDEX_SETTING_SHARD_LIMIT_GROUP.get(imd.getSettings()))) { + frozen += totalNewShards; + } else { + normal += totalNewShards; + } + } + } + + Optional error = checkShardLimit(normal, frozen, currentState); + if (error.isPresent()) { + ValidationException ex = new ValidationException(); + ex.addValidationError(error.get()); + throw ex; + } + } - Optional error = checkShardLimit(shardsToOpen, currentState); + public void validateShardLimitOnReplicaUpdate(ClusterState currentState, Index[] indices, int replicas) { + int frozen = 0; + int normal = 0; + for (Index index : indices) { + IndexMetadata imd = currentState.metadata().index(index); + int totalNewShards = getTotalNewShards(index, currentState, replicas); + if (FROZEN_GROUP.equals(INDEX_SETTING_SHARD_LIMIT_GROUP.get(imd.getSettings()))) { + frozen += totalNewShards; + } else { + normal += totalNewShards; + } + } + + Optional error = checkShardLimit(normal, frozen, currentState); if (error.isPresent()) { ValidationException ex = new ValidationException(); ex.addValidationError(error.get()); @@ -94,42 +152,70 @@ public void validateShardLimit(ClusterState currentState, Index[] indicesToOpen) } } - private static int getTotalShardCount(ClusterState state, Index index) { - IndexMetadata indexMetadata = state.metadata().index(index); - return indexMetadata.getNumberOfShards() * (1 + indexMetadata.getNumberOfReplicas()); + private int getTotalNewShards(Index index, ClusterState currentState, int updatedNumberOfReplicas) { + IndexMetadata indexMetadata = currentState.metadata().index(index); + int shardsInIndex = indexMetadata.getNumberOfShards(); + int oldNumberOfReplicas = indexMetadata.getNumberOfReplicas(); + int replicaIncrease = updatedNumberOfReplicas - oldNumberOfReplicas; + return replicaIncrease * shardsInIndex; } /** * Checks to see if an operation can be performed without taking the cluster over the cluster-wide shard limit. * Returns an error message if appropriate, or an empty {@link Optional} otherwise. * - * @param newShards The number of shards to be added by this operation + * @param newShards The number of normal shards to be added by this operation + * @param newFrozenShards The number of frozen shards to be added by this operation * @param state The current cluster state * @return If present, an error message to be given as the reason for failing * an operation. If empty, a sign that the operation is valid. */ - public Optional checkShardLimit(int newShards, ClusterState state) { - return checkShardLimit(newShards, state, getShardLimitPerNode()); + private Optional checkShardLimit(int newShards, int newFrozenShards, ClusterState state) { + // we verify the two limits independently. This also means that if they have mixed frozen and other data-roles nodes, such a mixed + // node can have both 1000 normal and 3000 frozen shards. This is the trade-off to keep the simplicity of the counts. We advocate + // against such mixed nodes for production use anyway. + int frozenNodeCount = nodeCount(state, ShardLimitValidator::hasFrozen); + int normalNodeCount = nodeCount(state, ShardLimitValidator::hasNonFrozen); + return checkShardLimit(newShards, state, getShardLimitPerNode(), normalNodeCount, "normal") + .or(() -> checkShardLimit(newFrozenShards, state, shardLimitPerNodeFrozen.get(), frozenNodeCount, "frozen")); } // package-private for testing - static Optional checkShardLimit(int newShards, ClusterState state, int maxShardsPerNodeSetting) { - int nodeCount = state.getNodes().getDataNodes().size(); - + static Optional checkShardLimit(int newShards, ClusterState state, int maxShardsPerNode, int nodeCount, String group) { // Only enforce the shard limit if we have at least one data node, so that we don't block // index creation during cluster setup - if (nodeCount == 0 || newShards < 0) { + if (nodeCount == 0 || newShards <= 0) { return Optional.empty(); } - int maxShardsPerNode = maxShardsPerNodeSetting; int maxShardsInCluster = maxShardsPerNode * nodeCount; int currentOpenShards = state.getMetadata().getTotalOpenIndexShards(); if ((currentOpenShards + newShards) > maxShardsInCluster) { - String errorMessage = "this action would add [" + newShards + "] total shards, but this cluster currently has [" + - currentOpenShards + "]/[" + maxShardsInCluster + "] maximum shards open"; - return Optional.of(errorMessage); + Predicate indexMetadataPredicate = imd -> + imd.getState().equals(IndexMetadata.State.OPEN) && group.equals(INDEX_SETTING_SHARD_LIMIT_GROUP.get(imd.getSettings())); + long currentFilteredShards = StreamSupport.stream(state.metadata().indices().values().spliterator(), false).map(oc -> oc.value) + .filter(indexMetadataPredicate).mapToInt(IndexMetadata::getTotalNumberOfShards).sum(); + if ((currentFilteredShards + newShards) > maxShardsInCluster) { + String errorMessage = "this action would add [" + newShards + "] shards, but this cluster currently has [" + + currentFilteredShards + "]/[" + maxShardsInCluster + "] maximum " + group + " shards open"; + return Optional.of(errorMessage); + } } return Optional.empty(); } + + private static int nodeCount(ClusterState state, Predicate nodePredicate) { + return (int) + StreamSupport.stream(state.getNodes().getDataNodes().values().spliterator(), false) + .map(oc -> oc.value).filter(nodePredicate).count(); + } + + private static boolean hasFrozen(DiscoveryNode node) { + return node.getRoles().contains(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE); + } + + private static boolean hasNonFrozen(DiscoveryNode node) { + return node.getRoles().stream().anyMatch(r -> r.canContainData() && r != DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE); + } + } diff --git a/server/src/test/java/org/elasticsearch/indices/ShardLimitValidatorTests.java b/server/src/test/java/org/elasticsearch/indices/ShardLimitValidatorTests.java index 3c7c11cbcfd66..94639adbd27c1 100644 --- a/server/src/test/java/org/elasticsearch/indices/ShardLimitValidatorTests.java +++ b/server/src/test/java/org/elasticsearch/indices/ShardLimitValidatorTests.java @@ -14,19 +14,23 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.shards.ShardCounts; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; import java.util.Arrays; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.metadata.MetadataIndexStateServiceTests.addClosedIndex; import static org.elasticsearch.cluster.metadata.MetadataIndexStateServiceTests.addOpenedIndex; @@ -41,20 +45,20 @@ public void testOverShardLimit() { int nodesInCluster = randomIntBetween(1, 90); ShardCounts counts = forDataNodeCount(nodesInCluster); - Settings clusterSettings = Settings.builder().build(); - - ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas() - ); + String group = randomFrom(ShardLimitValidator.VALID_GROUPS); + ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas(), + group); int shardsToAdd = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas()); - Optional errorMessage = ShardLimitValidator.checkShardLimit(shardsToAdd, state, counts.getShardsPerNode()); + Optional errorMessage = + ShardLimitValidator.checkShardLimit(shardsToAdd, state, counts.getShardsPerNode(), nodesInCluster, group); int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas()); int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas()); int maxShards = counts.getShardsPerNode() * nodesInCluster; assertTrue(errorMessage.isPresent()); - assertEquals("this action would add [" + totalShards + "] total shards, but this cluster currently has [" + currentShards - + "]/[" + maxShards + "] maximum shards open", errorMessage.get()); + assertEquals("this action would add [" + totalShards + "] shards, but this cluster currently has [" + currentShards + + "]/[" + maxShards + "] maximum " + group + " shards open", errorMessage.get()); } public void testUnderShardLimit() { @@ -62,49 +66,81 @@ public void testUnderShardLimit() { // Calculate the counts for a cluster 1 node smaller than we have to ensure we have headroom ShardCounts counts = forDataNodeCount(nodesInCluster - 1); - Settings clusterSettings = Settings.builder().build(); - - ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas() - ); + String group = randomFrom(ShardLimitValidator.VALID_GROUPS); + ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas(), + group); int existingShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas()); int shardsToAdd = randomIntBetween(1, (counts.getShardsPerNode() * nodesInCluster) - existingShards); - Optional errorMessage = ShardLimitValidator.checkShardLimit(shardsToAdd, state, counts.getShardsPerNode()); + Optional errorMessage = + ShardLimitValidator.checkShardLimit(shardsToAdd, state, counts.getShardsPerNode(), nodesInCluster, group); assertFalse(errorMessage.isPresent()); } - public void testValidateShardLimit() { + public void testValidateShardLimitOpenIndices() { int nodesInCluster = randomIntBetween(2, 90); ShardCounts counts = forDataNodeCount(nodesInCluster); - ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas(), - counts.getFailingIndexShards(), counts.getFailingIndexReplicas()); + final String group = randomFrom(ShardLimitValidator.VALID_GROUPS); + final ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), + counts.getFirstIndexReplicas(), + counts.getFailingIndexShards(), counts.getFailingIndexReplicas(), group); - Index[] indices = Arrays.stream(state.metadata().indices().values().toArray(IndexMetadata.class)) - .map(IndexMetadata::getIndex) - .collect(Collectors.toList()) - .toArray(new Index[2]); + Index[] indices = getIndices(state); int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas()); int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas()); int maxShards = counts.getShardsPerNode() * nodesInCluster; - ShardLimitValidator shardLimitValidator = createTestShardLimitService(counts.getShardsPerNode()); + ShardLimitValidator shardLimitValidator = createTestShardLimitService(counts.getShardsPerNode(), group); ValidationException exception = expectThrows(ValidationException.class, () -> shardLimitValidator.validateShardLimit(state, indices)); - assertEquals("Validation Failed: 1: this action would add [" + totalShards + "] total shards, but this cluster currently has [" + - currentShards + "]/[" + maxShards + "] maximum shards open;", exception.getMessage()); + assertEquals("Validation Failed: 1: this action would add [" + totalShards + "] shards, but this cluster currently has [" + + currentShards + "]/[" + maxShards + "] maximum " + group + " shards open;", exception.getMessage()); } - public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int shardsInIndex, int replicas) { - ImmutableOpenMap.Builder dataNodes = ImmutableOpenMap.builder(); - for (int i = 0; i < nodesInCluster; i++) { - dataNodes.put(randomAlphaOfLengthBetween(5, 15), mock(DiscoveryNode.class)); + public void testValidateShardLimitUpdateReplicas() { + final int nodesInCluster = randomIntBetween(2, 90); + final int shardsPerNode = randomIntBetween(1, 10); + final String group = randomFrom(ShardLimitValidator.VALID_GROUPS); + ClusterState state = createClusterStateForReplicaUpdate(nodesInCluster, shardsPerNode, group); + + final Index[] indices = getIndices(state); + final ShardLimitValidator shardLimitValidator = createTestShardLimitService(shardsPerNode, group); + shardLimitValidator.validateShardLimitOnReplicaUpdate(state, indices, nodesInCluster - 1); + + ValidationException exception = expectThrows(ValidationException.class, + () -> shardLimitValidator.validateShardLimitOnReplicaUpdate(state, indices, nodesInCluster)); + assertEquals("Validation Failed: 1: this action would add [" + (shardsPerNode * 2) + "] shards, but this cluster currently has [" + + (shardsPerNode * (nodesInCluster - 1)) + "]/[" + shardsPerNode * nodesInCluster + "] maximum " + group + " shards open;", + exception.getMessage()); + } + + public Index[] getIndices(ClusterState state) { + return Arrays.stream(state.metadata().indices().values().toArray(IndexMetadata.class)) + .map(IndexMetadata::getIndex) + .collect(Collectors.toList()) + .toArray(Index.EMPTY_ARRAY); + } + + private ClusterState createClusterStateForReplicaUpdate(int nodesInCluster, int shardsPerNode, String group) { + DiscoveryNodes nodes = createDiscoveryNodes(nodesInCluster, group); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).build(); + state = addOpenedIndex(randomAlphaOfLengthBetween(5, 15), shardsPerNode, nodesInCluster - 2, state); + if (group.equals(ShardLimitValidator.FROZEN_GROUP)) { + state = ClusterState.builder(state).metadata(freezeMetadata(Metadata.builder(state.metadata()), state.metadata())).build(); } - DiscoveryNodes nodes = mock(DiscoveryNodes.class); - when(nodes.getDataNodes()).thenReturn(dataNodes.build()); + return state; + } + + public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int shardsInIndex, int replicas, String group) { + DiscoveryNodes nodes = createDiscoveryNodes(nodesInCluster, group); + Settings.Builder settings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT); + if (ShardLimitValidator.FROZEN_GROUP.equals(group) || randomBoolean()) { + settings.put(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.getKey(), group); + } IndexMetadata.Builder indexMetadata = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 15)) - .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .settings(settings) .creationDate(randomLong()) .numberOfShards(shardsInIndex) .numberOfReplicas(replicas); @@ -122,13 +158,8 @@ public static ClusterState createClusterForShardLimitTest(int nodesInCluster, in } public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int openIndexShards, int openIndexReplicas, - int closedIndexShards, int closedIndexReplicas) { - ImmutableOpenMap.Builder dataNodes = ImmutableOpenMap.builder(); - for (int i = 0; i < nodesInCluster; i++) { - dataNodes.put(randomAlphaOfLengthBetween(5, 15), mock(DiscoveryNode.class)); - } - DiscoveryNodes nodes = mock(DiscoveryNodes.class); - when(nodes.getDataNodes()).thenReturn(dataNodes.build()); + int closedIndexShards, int closedIndexReplicas, String group) { + DiscoveryNodes nodes = createDiscoveryNodes(nodesInCluster, group); ClusterState state = ClusterState.builder(ClusterName.DEFAULT).build(); state = addOpenedIndex(randomAlphaOfLengthBetween(5, 15), openIndexShards, openIndexReplicas, state); @@ -140,9 +171,57 @@ public static ClusterState createClusterForShardLimitTest(int nodesInCluster, in } else { metadata.transientSettings(Settings.EMPTY); } + if (ShardLimitValidator.FROZEN_GROUP.equals(group)) { + freezeMetadata(metadata, state.metadata()); + } return ClusterState.builder(state).metadata(metadata).nodes(nodes).build(); } + public static DiscoveryNodes createDiscoveryNodes(int nodesInCluster, String group) { + ImmutableOpenMap.Builder dataNodes = ImmutableOpenMap.builder(); + for (int i = 0; i < nodesInCluster; i++) { + dataNodes.put(randomAlphaOfLengthBetween(5, 15), createNode(group)); + } + DiscoveryNodes nodes = mock(DiscoveryNodes.class); + when(nodes.getDataNodes()).thenReturn(dataNodes.build()); + return nodes; + } + + private static DiscoveryNode createNode(String group) { + DiscoveryNode mock = mock(DiscoveryNode.class); + if (ShardLimitValidator.FROZEN_GROUP.equals(group)) { + when(mock.getRoles()).thenReturn(randomBoolean() ? DiscoveryNodeRole.roles() : + Set.of(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE)); + } else { + when(mock.getRoles()).thenReturn(randomBoolean() ? DiscoveryNodeRole.roles() : + Set.of(randomFrom(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.DATA_HOT_NODE_ROLE, + DiscoveryNodeRole.DATA_WARM_NODE_ROLE, DiscoveryNodeRole.DATA_COLD_NODE_ROLE))); + } + return mock; + } + + private static Metadata.Builder freezeMetadata(Metadata.Builder builder, Metadata metadata) { + StreamSupport.stream(metadata.indices().values().spliterator(), false) + .map(oc -> oc.value).map(imd -> IndexMetadata.builder(imd).settings(Settings.builder().put(imd.getSettings()) + .put(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.getKey(), ShardLimitValidator.FROZEN_GROUP))) + .forEach(builder::put); + return builder; + } + + + public static ShardLimitValidator createTestShardLimitService(int maxShardsPerNode, String group) { + Setting setting = ShardLimitValidator.FROZEN_GROUP.equals(group) ? + ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN : SETTING_CLUSTER_MAX_SHARDS_PER_NODE; + + // Use a mocked clusterService - for unit tests we won't be updating the setting anyway. + ClusterService clusterService = mock(ClusterService.class); + Settings limitOnlySettings = Settings.builder().put(setting.getKey(), maxShardsPerNode).build(); + when(clusterService.getClusterSettings()) + .thenReturn(new ClusterSettings(limitOnlySettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + + return new ShardLimitValidator(limitOnlySettings, clusterService); + } + /** * Creates a {@link ShardLimitValidator} for testing with the given setting and a mocked cluster service. * diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsShardLimitIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsShardLimitIntegTests.java new file mode 100644 index 0000000000000..88d188f7a6ab0 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsShardLimitIntegTests.java @@ -0,0 +1,99 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.searchablesnapshots; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.ShardLimitValidator; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; + +import java.util.Locale; + +import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +@ESIntegTestCase.ClusterScope(maxNumDataNodes = 1) +public class SearchableSnapshotsShardLimitIntegTests extends BaseSearchableSnapshotsIntegTestCase { + + private static final int MAX_NORMAL = 3; + private static final int MAX_FROZEN = 20; + + private final String fsRepoName = randomAlphaOfLength(10); + private final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + private final String snapshotName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), MAX_NORMAL) + .put(ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN.getKey(), MAX_FROZEN) + .build(); + } + + @Override + protected int numberOfShards() { + return 1; + } + + @Override + protected int numberOfReplicas() { + return 0; + } + + public void testFrozenAndNormalIndependent() throws Exception { + createRepository(fsRepoName, "fs"); + + assertAcked(prepareCreate(indexName, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true))); + + createFullSnapshot(fsRepoName, snapshotName); + + final Settings.Builder indexSettingsBuilder = Settings.builder(); + final int initialCopies = between(1, MAX_FROZEN); + indexSettingsBuilder.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, initialCopies - 1); + mount(indexSettingsBuilder, MountSearchableSnapshotRequest.Storage.SHARED_CACHE); + + // one above limit. + indexSettingsBuilder.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, (MAX_FROZEN + 1) - initialCopies - 1); + expectLimitThrows(() -> mount(indexSettingsBuilder, MountSearchableSnapshotRequest.Storage.SHARED_CACHE)); + + // mount just at limit. + indexSettingsBuilder.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, MAX_FROZEN - initialCopies - 1); + mount(indexSettingsBuilder, MountSearchableSnapshotRequest.Storage.SHARED_CACHE); + + // cannot mount one more shard. + expectLimitThrows(() -> mount(Settings.EMPTY, MountSearchableSnapshotRequest.Storage.SHARED_CACHE)); + + // can still do full copy + mount(Settings.EMPTY, MountSearchableSnapshotRequest.Storage.FULL_COPY); + + // and normal index + createIndex(); + + // but now we have 3 normal shards, so must fail + expectLimitThrows(() -> mount(Settings.EMPTY, MountSearchableSnapshotRequest.Storage.FULL_COPY)); + expectLimitThrows(this::createIndex); + } + + private void expectLimitThrows(ThrowingRunnable runnable) { + expectThrows(IllegalArgumentException.class, runnable); + } + + public void createIndex() { + createIndex(randomAlphaOfLength(10).toLowerCase(Locale.ROOT)); + } + + private void mount(Settings.Builder settings, MountSearchableSnapshotRequest.Storage storage) throws Exception { + mount(settings.build(), storage); + } + + private void mount(Settings settings, MountSearchableSnapshotRequest.Storage storage) throws Exception { + mountSnapshot(fsRepoName, snapshotName, indexName, randomAlphaOfLength(11).toLowerCase(Locale.ROOT), settings, storage); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java index 3245136b4d1e2..018eb9bd5645a 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.indices.ShardLimitValidator; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.repositories.IndexId; @@ -145,7 +146,8 @@ private static Settings buildIndexSettings( if (storage == MountSearchableSnapshotRequest.Storage.SHARED_CACHE) { settings.put(SearchableSnapshotsConstants.SNAPSHOT_PARTIAL_SETTING.getKey(), true) - .put(DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.getKey(), true); + .put(DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.getKey(), true) + .put(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.getKey(), ShardLimitValidator.FROZEN_GROUP); } return settings.build();