Skip to content

Commit da20dfd

Browse files
authored
Add cluster-wide shard limit warnings (#34021)
In a future major version, we will be introducing a soft limit on the number of shards in a cluster based on the number of nodes in the cluster. This limit will be configurable, and checked on operations which create or open shards and issue a warning if the operation would take the cluster over the limit. There is an option to enable strict enforcement of the limit, which turns the warnings into errors. In a future release, the option will be removed and strict enforcement will be the default (and only) behavior.
1 parent c5a0739 commit da20dfd

File tree

16 files changed

+597
-8
lines changed

16 files changed

+597
-8
lines changed

docs/reference/migration/migrate_7_0/cluster.asciidoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,10 @@ primary shards of the opened index to be allocated.
1818
[float]
1919
==== Shard preferences `_primary`, `_primary_first`, `_replica`, and `_replica_first` are removed
2020
These shard preferences are removed in favour of the `_prefer_nodes` and `_only_nodes` preferences.
21+
22+
[float]
23+
==== Cluster-wide shard soft limit
24+
Clusters now have soft limits on the total number of open shards in the cluster
25+
based on the number of nodes and the `cluster.max_shards_per_node` cluster
26+
setting, to prevent accidental operations that would destabilize the cluster.
27+
More information can be found in the <<misc-cluster,documentation for that setting>>.

docs/reference/modules/cluster/misc.asciidoc

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,48 @@ user with access to the <<cluster-update-settings,cluster-update-settings>>
2222
API can make the cluster read-write again.
2323

2424

25+
[[cluster-shard-limit]]
26+
27+
==== Cluster Shard Limit
28+
29+
In a Elasticsearch 7.0 and later, there will be a soft limit on the number of
30+
shards in a cluster, based on the number of nodes in the cluster. This is
31+
intended to prevent operations which may unintentionally destabilize the
32+
cluster. Prior to 7.0, actions which would result in the cluster going over the
33+
limit will issue a deprecation warning.
34+
35+
NOTE: You can set the system property `es.enforce_max_shards_per_node` to `true`
36+
to opt in to strict enforcement of the shard limit. If this system property is
37+
set, actions which would result in the cluster going over the limit will result
38+
in an error, rather than a deprecation warning. This property will be removed in
39+
Elasticsearch 7.0, as strict enforcement of the limit will be the default and
40+
only behavior.
41+
42+
If an operation, such as creating a new index, restoring a snapshot of an index,
43+
or opening a closed index would lead to the number of shards in the cluster
44+
going over this limit, the operation will issue a deprecation warning.
45+
46+
If the cluster is already over the limit, due to changes in node membership or
47+
setting changes, all operations that create or open indices will issue warnings
48+
until either the limit is increased as described below, or some indices are
49+
<<indices-open-close,closed>> or <<indices-delete-index,deleted>> to bring the
50+
number of shards below the limit.
51+
52+
Replicas count towards this limit, but closed indexes do not. An index with 5
53+
primary shards and 2 replicas will be counted as 15 shards. Any closed index
54+
is counted as 0, no matter how many shards and replicas it contains.
55+
56+
The limit defaults to 1,000 shards per node, and be dynamically adjusted using
57+
the following property:
58+
59+
`cluster.max_shards_per_node`::
60+
61+
Controls the number of shards allowed in the cluster per node.
62+
63+
For example, a 3-node cluster with the default setting would allow 3,000 shards
64+
total, across all open indexes. If the above setting is changed to 1,500, then
65+
the cluster would allow 4,500 shards total.
66+
2567
[[user-defined-data]]
2668
==== User Defined Cluster Metadata
2769

@@ -109,4 +151,4 @@ Enable or disable allocation for persistent tasks:
109151
This setting does not affect the persistent tasks that are already being executed.
110152
Only newly created persistent tasks, or tasks that must be reassigned (after a node
111153
left the cluster, for example), are impacted by this setting.
112-
--
154+
--

server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.carrotsearch.hppc.ObjectHashSet;
2323
import com.carrotsearch.hppc.cursors.ObjectCursor;
2424
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
25-
2625
import org.apache.logging.log4j.Logger;
2726
import org.apache.lucene.util.CollectionUtil;
2827
import org.elasticsearch.action.AliasesRequest;
@@ -124,9 +123,11 @@ public enum XContentContext {
124123
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, ClusterState.FeatureAware {
125124

126125
EnumSet<XContentContext> context();
127-
128126
}
129127

128+
public static final Setting<Integer> SETTING_CLUSTER_MAX_SHARDS_PER_NODE =
129+
Setting.intSetting("cluster.max_shards_per_node", 1000, 1, Property.Dynamic, Property.NodeScope);
130+
130131
public static final Setting<Boolean> SETTING_READ_ONLY_SETTING =
131132
Setting.boolSetting("cluster.blocks.read_only", false, Property.Dynamic, Property.NodeScope);
132133

@@ -162,6 +163,7 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, Clust
162163
private final ImmutableOpenMap<String, Custom> customs;
163164

164165
private final transient int totalNumberOfShards; // Transient ? not serializable anyway?
166+
private final int totalOpenIndexShards;
165167
private final int numberOfShards;
166168

167169
private final String[] allIndices;
@@ -183,12 +185,17 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, Clust
183185
this.customs = customs;
184186
this.templates = templates;
185187
int totalNumberOfShards = 0;
188+
int totalOpenIndexShards = 0;
186189
int numberOfShards = 0;
187190
for (ObjectCursor<IndexMetaData> cursor : indices.values()) {
188191
totalNumberOfShards += cursor.value.getTotalNumberOfShards();
189192
numberOfShards += cursor.value.getNumberOfShards();
193+
if (IndexMetaData.State.OPEN.equals(cursor.value.getState())) {
194+
totalOpenIndexShards += cursor.value.getTotalNumberOfShards();
195+
}
190196
}
191197
this.totalNumberOfShards = totalNumberOfShards;
198+
this.totalOpenIndexShards = totalOpenIndexShards;
192199
this.numberOfShards = numberOfShards;
193200

194201
this.allIndices = allIndices;
@@ -667,10 +674,29 @@ public <T extends Custom> T custom(String type) {
667674
}
668675

669676

677+
/**
678+
* Gets the total number of shards from all indices, including replicas and
679+
* closed indices.
680+
* @return The total number shards from all indices.
681+
*/
670682
public int getTotalNumberOfShards() {
671683
return this.totalNumberOfShards;
672684
}
673685

686+
/**
687+
* Gets the total number of open shards from all indices. Includes
688+
* replicas, but does not include shards that are part of closed indices.
689+
* @return The total number of open shards from all indices.
690+
*/
691+
public int getTotalOpenIndexShards() {
692+
return this.totalOpenIndexShards;
693+
}
694+
695+
/**
696+
* Gets the number of primary shards from all indices, not including
697+
* replicas.
698+
* @return The number of primary shards from all indices.
699+
*/
674700
public int getNumberOfShards() {
675701
return this.numberOfShards;
676702
}

server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.elasticsearch.common.component.AbstractComponent;
5454
import org.elasticsearch.common.compress.CompressedXContent;
5555
import org.elasticsearch.common.io.PathUtils;
56+
import org.elasticsearch.common.logging.DeprecationLogger;
5657
import org.elasticsearch.common.settings.IndexScopedSettings;
5758
import org.elasticsearch.common.settings.Setting;
5859
import org.elasticsearch.common.settings.Settings;
@@ -82,6 +83,7 @@
8283
import java.util.List;
8384
import java.util.Locale;
8485
import java.util.Map;
86+
import java.util.Optional;
8587
import java.util.Set;
8688
import java.util.concurrent.atomic.AtomicInteger;
8789
import java.util.function.BiFunction;
@@ -587,19 +589,38 @@ public void onFailure(String source, Exception e) {
587589

588590
private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState state) {
589591
validateIndexName(request.index(), state);
590-
validateIndexSettings(request.index(), request.settings(), forbidPrivateIndexSettings);
592+
validateIndexSettings(request.index(), request.settings(), state, forbidPrivateIndexSettings);
591593
}
592594

593-
public void validateIndexSettings(
594-
final String indexName, final Settings settings, final boolean forbidPrivateIndexSettings) throws IndexCreationException {
595+
public void validateIndexSettings(String indexName, final Settings settings, final ClusterState clusterState,
596+
final boolean forbidPrivateIndexSettings) throws IndexCreationException {
595597
List<String> validationErrors = getIndexSettingsValidationErrors(settings, forbidPrivateIndexSettings);
598+
599+
Optional<String> shardAllocation = checkShardLimit(settings, clusterState, deprecationLogger);
600+
shardAllocation.ifPresent(validationErrors::add);
601+
596602
if (validationErrors.isEmpty() == false) {
597603
ValidationException validationException = new ValidationException();
598604
validationException.addValidationErrors(validationErrors);
599605
throw new IndexCreationException(indexName, validationException);
600606
}
601607
}
602608

609+
/**
610+
* Checks whether an index can be created without going over the cluster shard limit.
611+
*
612+
* @param settings The settings of the index to be created.
613+
* @param clusterState The current cluster state.
614+
* @param deprecationLogger The logger to use to emit a deprecation warning, if appropriate.
615+
* @return If present, an error message to be used to reject index creation. If empty, a signal that this operation may be carried out.
616+
*/
617+
static Optional<String> checkShardLimit(Settings settings, ClusterState clusterState, DeprecationLogger deprecationLogger) {
618+
int shardsToCreate = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(settings)
619+
* (1 + IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings));
620+
621+
return IndicesService.checkShardLimit(shardsToCreate, clusterState, deprecationLogger);
622+
}
623+
603624
List<String> getIndexSettingsValidationErrors(final Settings settings, final boolean forbidPrivateIndexSettings) {
604625
String customPath = IndexMetaData.INDEX_DATA_PATH_SETTING.get(settings);
605626
List<String> validationErrors = new ArrayList<>();

server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@
3636
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3737
import org.elasticsearch.cluster.service.ClusterService;
3838
import org.elasticsearch.common.Priority;
39+
import org.elasticsearch.common.ValidationException;
3940
import org.elasticsearch.common.component.AbstractComponent;
4041
import org.elasticsearch.common.inject.Inject;
42+
import org.elasticsearch.common.logging.DeprecationLogger;
4143
import org.elasticsearch.common.settings.Settings;
4244
import org.elasticsearch.index.Index;
4345
import org.elasticsearch.indices.IndicesService;
@@ -50,6 +52,7 @@
5052
import java.util.Arrays;
5153
import java.util.HashSet;
5254
import java.util.List;
55+
import java.util.Optional;
5356
import java.util.Set;
5457

5558
/**
@@ -175,6 +178,8 @@ public ClusterState execute(ClusterState currentState) {
175178
}
176179
}
177180

181+
validateShardLimit(currentState, request.indices(), deprecationLogger);
182+
178183
if (indicesToOpen.isEmpty()) {
179184
return currentState;
180185
}
@@ -217,4 +222,33 @@ public ClusterState execute(ClusterState currentState) {
217222
});
218223
}
219224

225+
/**
226+
* Validates whether a list of indices can be opened without going over the cluster shard limit. Only counts indices which are
227+
* currently closed and will be opened, ignores indices which are already open.
228+
*
229+
* @param currentState The current cluster state.
230+
* @param indices The indices which are to be opened.
231+
* @param deprecationLogger The logger to use to emit a deprecation warning, if appropriate.
232+
* @throws ValidationException If this operation would take the cluster over the limit and enforcement is enabled.
233+
*/
234+
static void validateShardLimit(ClusterState currentState, Index[] indices, DeprecationLogger deprecationLogger) {
235+
int shardsToOpen = Arrays.stream(indices)
236+
.filter(index -> currentState.metaData().index(index).getState().equals(IndexMetaData.State.CLOSE))
237+
.mapToInt(index -> getTotalShardCount(currentState, index))
238+
.sum();
239+
240+
Optional<String> error = IndicesService.checkShardLimit(shardsToOpen, currentState, deprecationLogger);
241+
if (error.isPresent()) {
242+
ValidationException ex = new ValidationException();
243+
ex.addValidationError(error.get());
244+
throw ex;
245+
}
246+
247+
}
248+
249+
private static int getTotalShardCount(ClusterState state, Index index) {
250+
IndexMetaData indexMetaData = state.metaData().index(index);
251+
return indexMetaData.getNumberOfShards() * (1 + indexMetaData.getNumberOfReplicas());
252+
}
253+
220254
}

server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3434
import org.elasticsearch.cluster.service.ClusterService;
3535
import org.elasticsearch.common.Priority;
36+
import org.elasticsearch.common.ValidationException;
3637
import org.elasticsearch.common.collect.Tuple;
3738
import org.elasticsearch.common.component.AbstractComponent;
3839
import org.elasticsearch.common.inject.Inject;
@@ -45,9 +46,11 @@
4546
import org.elasticsearch.threadpool.ThreadPool;
4647

4748
import java.io.IOException;
49+
import java.util.Arrays;
4850
import java.util.HashSet;
4951
import java.util.Locale;
5052
import java.util.Map;
53+
import java.util.Optional;
5154
import java.util.Set;
5255

5356
import static org.elasticsearch.action.support.ContextPreservingActionListener.wrapPreservingContext;
@@ -115,6 +118,7 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
115118

116119
@Override
117120
public ClusterState execute(ClusterState currentState) {
121+
118122
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
119123
MetaData.Builder metaDataBuilder = MetaData.builder(currentState.metaData());
120124

@@ -141,6 +145,18 @@ public ClusterState execute(ClusterState currentState) {
141145

142146
int updatedNumberOfReplicas = openSettings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, -1);
143147
if (updatedNumberOfReplicas != -1 && preserveExisting == false) {
148+
149+
// Verify that this won't take us over the cluster shard limit.
150+
int totalNewShards = Arrays.stream(request.indices())
151+
.mapToInt(i -> getTotalNewShards(i, currentState, updatedNumberOfReplicas))
152+
.sum();
153+
Optional<String> error = IndicesService.checkShardLimit(totalNewShards, currentState, deprecationLogger);
154+
if (error.isPresent()) {
155+
ValidationException ex = new ValidationException();
156+
ex.addValidationError(error.get());
157+
throw ex;
158+
}
159+
144160
// we do *not* update the in sync allocation ids as they will be removed upon the first index
145161
// operation which make these copies stale
146162
// TODO: update the list once the data is deleted by the node?
@@ -224,6 +240,14 @@ public ClusterState execute(ClusterState currentState) {
224240
});
225241
}
226242

243+
private int getTotalNewShards(Index index, ClusterState currentState, int updatedNumberOfReplicas) {
244+
IndexMetaData indexMetaData = currentState.metaData().index(index);
245+
int shardsInIndex = indexMetaData.getNumberOfShards();
246+
int oldNumberOfReplicas = indexMetaData.getNumberOfReplicas();
247+
int replicaIncrease = updatedNumberOfReplicas - oldNumberOfReplicas;
248+
return replicaIncrease * shardsInIndex;
249+
}
250+
227251
/**
228252
* Updates the cluster block only iff the setting exists in the given settings
229253
*/

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ public void apply(Settings value, Settings current, Settings previous) {
196196
MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING,
197197
MetaData.SETTING_READ_ONLY_SETTING,
198198
MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
199+
MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
199200
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
200201
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
201202
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,

0 commit comments

Comments
 (0)