Skip to content

Introduce separate shard limit for frozen shards #71392

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions docs/reference/modules/cluster/misc.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ either the limit is increased as described below, or some indices are
<<indices-open-close,closed>> or <<indices-delete-index,deleted>> to bring the
number of shards below the limit.

The cluster shard limit defaults to 1,000 shards per data node.
Both primary and replica shards of all open indices count toward the limit,
including unassigned shards.
For example, an open index with 5 primary shards and 2 replicas counts as 15 shards.
Closed indices do not contribute to the shard count.
The cluster shard limit defaults to 1,000 shards per non-frozen data node for
normal (non-frozen) indices and 3000 shards per frozen data node for frozen
indices.
Both primary and replica shards of all open indices count toward the limit,
including unassigned shards.
For example, an open index with 5 primary shards and 2 replicas counts as 15 shards.
Closed indices do not contribute to the shard count.

You can dynamically adjust the cluster shard limit with the following setting:

Expand All @@ -61,7 +63,7 @@ You can dynamically adjust the cluster shard limit with the following setting:
Limits the total number of primary and replica shards for the cluster. {es}
calculates the limit as follows:

`cluster.max_shards_per_node * number of data nodes`
`cluster.max_shards_per_node * number of non-frozen data nodes`

Shards for closed indices do not count toward this limit. Defaults to `1000`.
A cluster with no data nodes is unlimited.
Expand All @@ -71,7 +73,29 @@ example, a cluster with a `cluster.max_shards_per_node` setting of `100` and
three data nodes has a shard limit of 300. If the cluster already contains 296
shards, {es} rejects any request that adds five or more shards to the cluster.

NOTE: This setting does not limit shards for individual nodes. To limit the
Notice that frozen shards have their own independent limit.
--

[[cluster-max-shards-per-node-frozen]]
`cluster.max_shards_per_node.frozen`::
+
--
(<<dynamic-cluster-setting,Dynamic>>)
Limits the total number of primary and replica frozen shards for the cluster.
{es} calculates the limit as follows:

`cluster.max_shards_per_node * number of frozen data nodes`

Shards for closed indices do not count toward this limit. Defaults to `3000`.
A cluster with no frozen data nodes is unlimited.

{es} rejects any request that creates more frozen shards than this limit allows.
For example, a cluster with a `cluster.max_shards_per_node.frozen` setting of
`100` and three frozen data nodes has a frozen shard limit of 300. If the
cluster already contains 296 shards, {es} rejects any request that adds five or
more frozen shards to the cluster.

NOTE: These setting do not limit shards for individual nodes. To limit the
number of shards for each node, use the
<<cluster-total-shards-per-node,`cluster.routing.allocation.total_shards_per_node`>>
setting.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ public void testIncreaseReplicasOverLimit() {
fail("shouldn't be able to increase the number of replicas");
} catch (IllegalArgumentException e) {
String expectedError = "Validation Failed: 1: this action would add [" + (dataNodes * firstShardCount)
+ "] total shards, but this cluster currently has [" + firstShardCount + "]/[" + dataNodes * shardsPerNode
+ "] maximum shards open;";
+ "] shards, but this cluster currently has [" + firstShardCount + "]/[" + dataNodes * shardsPerNode
+ "] maximum normal shards open;";
assertEquals(expectedError, e.getMessage());
}
Metadata clusterState = client().admin().cluster().prepareState().get().getState().metadata();
Expand Down Expand Up @@ -192,8 +192,8 @@ public void testChangingMultipleIndicesOverLimit() {
int difference = totalShardsAfter - totalShardsBefore;

String expectedError = "Validation Failed: 1: this action would add [" + difference
+ "] total shards, but this cluster currently has [" + totalShardsBefore + "]/[" + dataNodes * shardsPerNode
+ "] maximum shards open;";
+ "] shards, but this cluster currently has [" + totalShardsBefore + "]/[" + dataNodes * shardsPerNode
+ "] maximum normal shards open;";
assertEquals(expectedError, e.getMessage());
}
Metadata clusterState = client().admin().cluster().prepareState().get().getState().metadata();
Expand Down Expand Up @@ -352,7 +352,7 @@ private void verifyException(int dataNodes, ShardCounts counts, IllegalArgumentE
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
int maxShards = counts.getShardsPerNode() * dataNodes;
String expectedError = "Validation Failed: 1: this action would add [" + totalShards
+ "] total shards, but this cluster currently has [" + currentShards + "]/[" + maxShards + "] maximum shards open;";
+ "] shards, but this cluster currently has [" + currentShards + "]/[" + maxShards + "] maximum normal shards open;";
assertEquals(expectedError, e.getMessage());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.IndexScopedSettings;
Expand All @@ -38,7 +37,6 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;

import static org.elasticsearch.action.support.ContextPreservingActionListener.wrapPreservingContext;
Expand Down Expand Up @@ -135,15 +133,7 @@ public ClusterState execute(ClusterState currentState) {
final int updatedNumberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(openSettings);
if (preserveExisting == false) {
// Verify that this won't take us over the cluster shard limit.
int totalNewShards = Arrays.stream(request.indices())
.mapToInt(i -> getTotalNewShards(i, currentState, updatedNumberOfReplicas))
.sum();
Optional<String> error = shardLimitValidator.checkShardLimit(totalNewShards, currentState);
if (error.isPresent()) {
ValidationException ex = new ValidationException();
ex.addValidationError(error.get());
throw ex;
}
shardLimitValidator.validateShardLimitOnReplicaUpdate(currentState, request.indices(), updatedNumberOfReplicas);

/*
* We do not update the in-sync allocation IDs as they will be removed upon the first index operation which makes
Expand Down Expand Up @@ -269,14 +259,6 @@ public ClusterState execute(ClusterState currentState) {
});
}

private int getTotalNewShards(Index index, ClusterState currentState, int updatedNumberOfReplicas) {
IndexMetadata indexMetadata = currentState.metadata().index(index);
int shardsInIndex = indexMetadata.getNumberOfShards();
int oldNumberOfReplicas = indexMetadata.getNumberOfReplicas();
int replicaIncrease = updatedNumberOfReplicas - oldNumberOfReplicas;
return replicaIncrease * shardsInIndex;
}

/**
* Updates the cluster block only iff the setting exists in the given settings
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,8 @@ public void apply(Settings value, Settings current, Settings previous) {
FsHealthService.ENABLED_SETTING,
FsHealthService.REFRESH_INTERVAL_SETTING,
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
IndexingPressure.MAX_INDEXING_BYTES);
IndexingPressure.MAX_INDEXING_BYTES,
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN);

static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesRequestCache;
import org.elasticsearch.indices.ShardLimitValidator;

import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -157,6 +158,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING,
DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS,
ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP,

// validate that built-in similarities don't get redefined
Setting.groupSetting(
Expand Down
128 changes: 107 additions & 21 deletions server/src/main/java/org/elasticsearch/indices/ShardLimitValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;

import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;

Expand All @@ -33,17 +37,40 @@
public class ShardLimitValidator {
public static final Setting<Integer> SETTING_CLUSTER_MAX_SHARDS_PER_NODE =
Setting.intSetting("cluster.max_shards_per_node", 1000, 1, Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<Integer> SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN =
Setting.intSetting("cluster.max_shards_per_node.frozen", 3000, 1, Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final String FROZEN_GROUP = "frozen";
static final Set<String> VALID_GROUPS = Set.of("normal", FROZEN_GROUP);
public static final Setting<String> INDEX_SETTING_SHARD_LIMIT_GROUP =
Setting.simpleString("index.shard_limit.group", "normal",
value -> {
if (VALID_GROUPS.contains(value) == false) {
throw new IllegalArgumentException("[" + value + "] is not a valid shard limit group");
}
},
Setting.Property.IndexScope,
Setting.Property.PrivateIndex,
Setting.Property.NotCopyableOnResize
);
protected final AtomicInteger shardLimitPerNode = new AtomicInteger();
protected final AtomicInteger shardLimitPerNodeFrozen = new AtomicInteger();

public ShardLimitValidator(final Settings settings, ClusterService clusterService) {
this.shardLimitPerNode.set(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(settings));
this.shardLimitPerNodeFrozen.set(SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN.get(settings));
clusterService.getClusterSettings().addSettingsUpdateConsumer(SETTING_CLUSTER_MAX_SHARDS_PER_NODE, this::setShardLimitPerNode);
clusterService.getClusterSettings().addSettingsUpdateConsumer(SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN,
this::setShardLimitPerNodeFrozen);
}

private void setShardLimitPerNode(int newValue) {
this.shardLimitPerNode.set(newValue);
}

private void setShardLimitPerNodeFrozen(int newValue) {
this.shardLimitPerNodeFrozen.set(newValue);
}

/**
* Gets the currently configured value of the {@link ShardLimitValidator#SETTING_CLUSTER_MAX_SHARDS_PER_NODE} setting.
* @return the current value of the setting
Expand All @@ -63,8 +90,9 @@ public void validateShardLimit(final Settings settings, final ClusterState state
final int numberOfShards = INDEX_NUMBER_OF_SHARDS_SETTING.get(settings);
final int numberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings);
final int shardsToCreate = numberOfShards * (1 + numberOfReplicas);
final boolean frozen = FROZEN_GROUP.equals(INDEX_SETTING_SHARD_LIMIT_GROUP.get(settings));

final Optional<String> shardLimit = checkShardLimit(shardsToCreate, state);
final Optional<String> shardLimit = checkShardLimit(frozen == false ? shardsToCreate : 0, frozen ? shardsToCreate : 0, state);
if (shardLimit.isPresent()) {
final ValidationException e = new ValidationException();
e.addValidationError(shardLimit.get());
Expand All @@ -81,55 +109,113 @@ public void validateShardLimit(final Settings settings, final ClusterState state
* @throws ValidationException If this operation would take the cluster over the limit and enforcement is enabled.
*/
public void validateShardLimit(ClusterState currentState, Index[] indicesToOpen) {
int shardsToOpen = Arrays.stream(indicesToOpen)
.filter(index -> currentState.metadata().index(index).getState().equals(IndexMetadata.State.CLOSE))
.mapToInt(index -> getTotalShardCount(currentState, index))
.sum();
int frozen = 0;
int normal = 0;
for (Index index : indicesToOpen) {
IndexMetadata imd = currentState.metadata().index(index);
if (imd.getState().equals(IndexMetadata.State.CLOSE)) {
int totalNewShards = imd.getNumberOfShards() * (1 + imd.getNumberOfReplicas());
if (FROZEN_GROUP.equals(INDEX_SETTING_SHARD_LIMIT_GROUP.get(imd.getSettings()))) {
frozen += totalNewShards;
} else {
normal += totalNewShards;
}
}
}

Optional<String> error = checkShardLimit(normal, frozen, currentState);
if (error.isPresent()) {
ValidationException ex = new ValidationException();
ex.addValidationError(error.get());
throw ex;
}
}

Optional<String> error = checkShardLimit(shardsToOpen, currentState);
public void validateShardLimitOnReplicaUpdate(ClusterState currentState, Index[] indices, int replicas) {
int frozen = 0;
int normal = 0;
for (Index index : indices) {
IndexMetadata imd = currentState.metadata().index(index);
int totalNewShards = getTotalNewShards(index, currentState, replicas);
if (FROZEN_GROUP.equals(INDEX_SETTING_SHARD_LIMIT_GROUP.get(imd.getSettings()))) {
frozen += totalNewShards;
} else {
normal += totalNewShards;
}
}

Optional<String> error = checkShardLimit(normal, frozen, currentState);
if (error.isPresent()) {
ValidationException ex = new ValidationException();
ex.addValidationError(error.get());
throw ex;
}
}

private static int getTotalShardCount(ClusterState state, Index index) {
IndexMetadata indexMetadata = state.metadata().index(index);
return indexMetadata.getNumberOfShards() * (1 + indexMetadata.getNumberOfReplicas());
private int getTotalNewShards(Index index, ClusterState currentState, int updatedNumberOfReplicas) {
IndexMetadata indexMetadata = currentState.metadata().index(index);
int shardsInIndex = indexMetadata.getNumberOfShards();
int oldNumberOfReplicas = indexMetadata.getNumberOfReplicas();
int replicaIncrease = updatedNumberOfReplicas - oldNumberOfReplicas;
return replicaIncrease * shardsInIndex;
}

/**
* Checks to see if an operation can be performed without taking the cluster over the cluster-wide shard limit.
* Returns an error message if appropriate, or an empty {@link Optional} otherwise.
*
* @param newShards The number of shards to be added by this operation
* @param newShards The number of normal shards to be added by this operation
* @param newFrozenShards The number of frozen shards to be added by this operation
* @param state The current cluster state
* @return If present, an error message to be given as the reason for failing
* an operation. If empty, a sign that the operation is valid.
*/
public Optional<String> checkShardLimit(int newShards, ClusterState state) {
return checkShardLimit(newShards, state, getShardLimitPerNode());
private Optional<String> checkShardLimit(int newShards, int newFrozenShards, ClusterState state) {
// we verify the two limits independently. This also means that if they have mixed frozen and other data-roles nodes, such a mixed
// node can have both 1000 normal and 3000 frozen shards. This is the trade-off to keep the simplicity of the counts. We advocate
// against such mixed nodes for production use anyway.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this, good comment.

int frozenNodeCount = nodeCount(state, ShardLimitValidator::hasFrozen);
int normalNodeCount = nodeCount(state, ShardLimitValidator::hasNonFrozen);
return checkShardLimit(newShards, state, getShardLimitPerNode(), normalNodeCount, "normal")
.or(() -> checkShardLimit(newFrozenShards, state, shardLimitPerNodeFrozen.get(), frozenNodeCount, "frozen"));
}

// package-private for testing
static Optional<String> checkShardLimit(int newShards, ClusterState state, int maxShardsPerNodeSetting) {
int nodeCount = state.getNodes().getDataNodes().size();

static Optional<String> checkShardLimit(int newShards, ClusterState state, int maxShardsPerNode, int nodeCount, String group) {
// Only enforce the shard limit if we have at least one data node, so that we don't block
// index creation during cluster setup
if (nodeCount == 0 || newShards < 0) {
if (nodeCount == 0 || newShards <= 0) {
return Optional.empty();
}
int maxShardsPerNode = maxShardsPerNodeSetting;
int maxShardsInCluster = maxShardsPerNode * nodeCount;
int currentOpenShards = state.getMetadata().getTotalOpenIndexShards();

if ((currentOpenShards + newShards) > maxShardsInCluster) {
String errorMessage = "this action would add [" + newShards + "] total shards, but this cluster currently has [" +
currentOpenShards + "]/[" + maxShardsInCluster + "] maximum shards open";
return Optional.of(errorMessage);
Predicate<IndexMetadata> indexMetadataPredicate = imd ->
imd.getState().equals(IndexMetadata.State.OPEN) && group.equals(INDEX_SETTING_SHARD_LIMIT_GROUP.get(imd.getSettings()));
long currentFilteredShards = StreamSupport.stream(state.metadata().indices().values().spliterator(), false).map(oc -> oc.value)
.filter(indexMetadataPredicate).mapToInt(IndexMetadata::getTotalNumberOfShards).sum();
if ((currentFilteredShards + newShards) > maxShardsInCluster) {
String errorMessage = "this action would add [" + newShards + "] shards, but this cluster currently has [" +
currentFilteredShards + "]/[" + maxShardsInCluster + "] maximum " + group + " shards open";
return Optional.of(errorMessage);
}
}
return Optional.empty();
}

private static int nodeCount(ClusterState state, Predicate<DiscoveryNode> nodePredicate) {
return (int)
StreamSupport.stream(state.getNodes().getDataNodes().values().spliterator(), false)
.map(oc -> oc.value).filter(nodePredicate).count();
}

private static boolean hasFrozen(DiscoveryNode node) {
return node.getRoles().contains(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE);
}

private static boolean hasNonFrozen(DiscoveryNode node) {
return node.getRoles().stream().anyMatch(r -> r.canContainData() && r != DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE);
}

}
Loading