Skip to content

Commit fadbe0d

Browse files
authored
Automatically prepare indices for splitting (#27451)
Today we require users to prepare their indices for split operations. Yet, we can do this automatically when an index is created which would make the split feature a much more appealing option since it doesn't have any 3rd party prerequisites anymore. This change automatically sets the number of routinng shards such that an index is guaranteed to be able to split once into twice as many shards. The number of routing shards is scaled towards the default shard limit per index such that indices with a smaller amount of shards can be split more often than larger ones. For instance an index with 1 or 2 shards can be split 10x (until it approaches 1024 shards) while an index created with 128 shards can only be split 3x by a factor of 2. Please note this is just a default value and users can still prepare their indices with `index.number_of_routing_shards` for custom splitting. NOTE: this change has an impact on the document distribution since we are changing the hash space. Documents are still uniformly distributed across all shards but since we are artificually changing the number of buckets in the consistent hashign space document might be hashed into different shards compared to previous versions. This is a 7.0 only change.
1 parent 05998f9 commit fadbe0d

File tree

40 files changed

+493
-148
lines changed

40 files changed

+493
-148
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/SearchIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ public void testSearchWithMatrixStats() throws IOException {
264264
assertEquals(5, matrixStats.getFieldCount("num"));
265265
assertEquals(56d, matrixStats.getMean("num"), 0d);
266266
assertEquals(1830d, matrixStats.getVariance("num"), 0d);
267-
assertEquals(0.09340198804973057, matrixStats.getSkewness("num"), 0d);
267+
assertEquals(0.09340198804973046, matrixStats.getSkewness("num"), 0d);
268268
assertEquals(1.2741646510794589, matrixStats.getKurtosis("num"), 0d);
269269
assertEquals(5, matrixStats.getFieldCount("num2"));
270270
assertEquals(29d, matrixStats.getMean("num2"), 0d);

core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,14 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final Resi
163163

164164
if (IndexMetaData.INDEX_ROUTING_PARTITION_SIZE_SETTING.exists(targetIndexSettings)) {
165165
throw new IllegalArgumentException("cannot provide a routing partition size value when resizing an index");
166+
166167
}
167168
if (IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(targetIndexSettings)) {
168-
throw new IllegalArgumentException("cannot provide index.number_of_routing_shards on resize");
169+
// if we have a source index with 1 shards it's legal to set this
170+
final boolean splitFromSingleShards = resizeRequest.getResizeType() == ResizeType.SPLIT && metaData.getNumberOfShards() == 1;
171+
if (splitFromSingleShards == false) {
172+
throw new IllegalArgumentException("cannot provide index.number_of_routing_shards on resize");
173+
}
169174
}
170175
String cause = resizeRequest.getResizeType().name().toLowerCase(Locale.ROOT) + "_index";
171176
targetIndex.cause(cause);

core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java

+14-6
Original file line numberDiff line numberDiff line change
@@ -1333,25 +1333,33 @@ public int getRoutingFactor() {
13331333
* @return a the source shard ID to split off from
13341334
*/
13351335
public static ShardId selectSplitShard(int shardId, IndexMetaData sourceIndexMetadata, int numTargetShards) {
1336+
int numSourceShards = sourceIndexMetadata.getNumberOfShards();
13361337
if (shardId >= numTargetShards) {
13371338
throw new IllegalArgumentException("the number of target shards (" + numTargetShards + ") must be greater than the shard id: "
13381339
+ shardId);
13391340
}
1340-
int numSourceShards = sourceIndexMetadata.getNumberOfShards();
1341+
final int routingFactor = getRoutingFactor(numSourceShards, numTargetShards);
1342+
assertSplitMetadata(numSourceShards, numTargetShards, sourceIndexMetadata);
1343+
return new ShardId(sourceIndexMetadata.getIndex(), shardId/routingFactor);
1344+
}
1345+
1346+
private static void assertSplitMetadata(int numSourceShards, int numTargetShards, IndexMetaData sourceIndexMetadata) {
13411347
if (numSourceShards > numTargetShards) {
13421348
throw new IllegalArgumentException("the number of source shards [" + numSourceShards
1343-
+ "] must be less that the number of target shards [" + numTargetShards + "]");
1349+
+ "] must be less that the number of target shards [" + numTargetShards + "]");
13441350
}
1345-
int routingFactor = getRoutingFactor(numSourceShards, numTargetShards);
13461351
// now we verify that the numRoutingShards is valid in the source index
1347-
int routingNumShards = sourceIndexMetadata.getRoutingNumShards();
1352+
// note: if the number of shards is 1 in the source index we can just assume it's correct since from 1 we can split into anything
1353+
// this is important to special case here since we use this to validate this in various places in the code but allow to split form
1354+
// 1 to N but we never modify the sourceIndexMetadata to accommodate for that
1355+
int routingNumShards = numSourceShards == 1 ? numTargetShards : sourceIndexMetadata.getRoutingNumShards();
13481356
if (routingNumShards % numTargetShards != 0) {
13491357
throw new IllegalStateException("the number of routing shards ["
13501358
+ routingNumShards + "] must be a multiple of the target shards [" + numTargetShards + "]");
13511359
}
13521360
// this is just an additional assertion that ensures we are a factor of the routing num shards.
1353-
assert getRoutingFactor(numTargetShards, sourceIndexMetadata.getRoutingNumShards()) >= 0;
1354-
return new ShardId(sourceIndexMetadata.getIndex(), shardId/routingFactor);
1361+
assert sourceIndexMetadata.getNumberOfShards() == 1 // special case - we can split into anything from 1 shard
1362+
|| getRoutingFactor(numTargetShards, routingNumShards) >= 0;
13551363
}
13561364

13571365
/**

core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java

+38-7
Original file line numberDiff line numberDiff line change
@@ -379,15 +379,24 @@ public ClusterState execute(ClusterState currentState) throws Exception {
379379
indexSettingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getProvidedName());
380380
indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID());
381381
final IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index());
382-
382+
final Settings idxSettings = indexSettingsBuilder.build();
383+
int numTargetShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(idxSettings);
383384
final int routingNumShards;
384-
if (recoverFromIndex == null) {
385-
Settings idxSettings = indexSettingsBuilder.build();
386-
routingNumShards = IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.get(idxSettings);
385+
final Version indexVersionCreated = idxSettings.getAsVersion(IndexMetaData.SETTING_VERSION_CREATED, null);
386+
final IndexMetaData sourceMetaData = recoverFromIndex == null ? null :
387+
currentState.metaData().getIndexSafe(recoverFromIndex);
388+
if (sourceMetaData == null || sourceMetaData.getNumberOfShards() == 1) {
389+
// in this case we either have no index to recover from or
390+
// we have a source index with 1 shard and without an explicit split factor
391+
// or one that is valid in that case we can split into whatever and auto-generate a new factor.
392+
if (IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(idxSettings)) {
393+
routingNumShards = IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.get(idxSettings);
394+
} else {
395+
routingNumShards = calculateNumRoutingShards(numTargetShards, indexVersionCreated);
396+
}
387397
} else {
388398
assert IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(indexSettingsBuilder.build()) == false
389-
: "index.number_of_routing_shards should be present on the target index on resize";
390-
final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex);
399+
: "index.number_of_routing_shards should not be present on the target index on resize";
391400
routingNumShards = sourceMetaData.getRoutingNumShards();
392401
}
393402
// remove the setting it's temporary and is only relevant once we create the index
@@ -408,7 +417,6 @@ public ClusterState execute(ClusterState currentState) throws Exception {
408417
* the maximum primary term on all the shards in the source index. This ensures that we have correct
409418
* document-level semantics regarding sequence numbers in the shrunken index.
410419
*/
411-
final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex);
412420
final long primaryTerm =
413421
IntStream
414422
.range(0, sourceMetaData.getNumberOfShards())
@@ -717,4 +725,27 @@ static void prepareResizeIndexSettings(ClusterState currentState, Set<String> ma
717725
.put(IndexMetaData.INDEX_RESIZE_SOURCE_NAME.getKey(), resizeSourceIndex.getName())
718726
.put(IndexMetaData.INDEX_RESIZE_SOURCE_UUID.getKey(), resizeSourceIndex.getUUID());
719727
}
728+
729+
/**
730+
* Returns a default number of routing shards based on the number of shards of the index. The default number of routing shards will
731+
* allow any index to be split at least once and at most 10 times by a factor of two. The closer the number or shards gets to 1024
732+
* the less default split operations are supported
733+
*/
734+
public static int calculateNumRoutingShards(int numShards, Version indexVersionCreated) {
735+
if (indexVersionCreated.onOrAfter(Version.V_7_0_0_alpha1)) {
736+
// only select this automatically for indices that are created on or after 7.0 this will prevent this new behaviour
737+
// until we have a fully upgraded cluster. Additionally it will make integratin testing easier since mixed clusters
738+
// will always have the behavior of the min node in the cluster.
739+
//
740+
// We use as a default number of routing shards the higher number that can be expressed
741+
// as {@code numShards * 2^x`} that is less than or equal to the maximum number of shards: 1024.
742+
int log2MaxNumShards = 10; // logBase2(1024)
743+
int log2NumShards = 32 - Integer.numberOfLeadingZeros(numShards - 1); // ceil(logBase2(numShards))
744+
int numSplits = log2MaxNumShards - log2NumShards;
745+
numSplits = Math.max(1, numSplits); // Ensure the index can be split at least once
746+
return numShards * 1 << numSplits;
747+
} else {
748+
return numShards;
749+
}
750+
}
720751
}

core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ public void testInvalidPartitionSize() {
317317
response = prepareCreate("test_" + shards + "_" + partitionSize)
318318
.setSettings(Settings.builder()
319319
.put("index.number_of_shards", shards)
320+
.put("index.number_of_routing_shards", shards)
320321
.put("index.routing_partition_size", partitionSize))
321322
.execute().actionGet();
322323
} catch (IllegalStateException | IllegalArgumentException e) {

core/src/test/java/org/elasticsearch/action/admin/indices/create/SplitIndexIT.java

+37-16
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.client.Client;
4040
import org.elasticsearch.cluster.ClusterState;
4141
import org.elasticsearch.cluster.metadata.IndexMetaData;
42+
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
4243
import org.elasticsearch.cluster.node.DiscoveryNode;
4344
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
4445
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -66,7 +67,6 @@
6667
import java.util.List;
6768
import java.util.Set;
6869
import java.util.function.BiFunction;
69-
import java.util.function.IntFunction;
7070
import java.util.stream.IntStream;
7171

7272
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -88,21 +88,40 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
8888
}
8989

9090
public void testCreateSplitIndexToN() throws IOException {
91-
int[][] possibleShardSplits = new int[][] {{2,4,8}, {3, 6, 12}, {1, 2, 4}};
91+
int[][] possibleShardSplits = new int[][]{{2, 4, 8}, {3, 6, 12}, {1, 2, 4}};
9292
int[] shardSplits = randomFrom(possibleShardSplits);
93-
assertEquals(shardSplits[0], (shardSplits[0] * shardSplits[1]) / shardSplits[1]);
94-
assertEquals(shardSplits[1], (shardSplits[1] * shardSplits[2]) / shardSplits[2]);
93+
splitToN(shardSplits[0], shardSplits[1], shardSplits[2]);
94+
}
95+
96+
public void testSplitFromOneToN() {
97+
splitToN(1, 5, 10);
98+
client().admin().indices().prepareDelete("*").get();
99+
int randomSplit = randomIntBetween(2, 6);
100+
splitToN(1, randomSplit, randomSplit * 2);
101+
}
102+
103+
private void splitToN(int sourceShards, int firstSplitShards, int secondSplitShards) {
104+
105+
assertEquals(sourceShards, (sourceShards * firstSplitShards) / firstSplitShards);
106+
assertEquals(firstSplitShards, (firstSplitShards * secondSplitShards) / secondSplitShards);
95107
internalCluster().ensureAtLeastNumDataNodes(2);
96108
final boolean useRouting = randomBoolean();
97109
final boolean useNested = randomBoolean();
98110
final boolean useMixedRouting = useRouting ? randomBoolean() : false;
99111
CreateIndexRequestBuilder createInitialIndex = prepareCreate("source");
100-
final int routingShards = shardSplits[2] * randomIntBetween(1, 10);
101-
Settings.Builder settings = Settings.builder().put(indexSettings())
102-
.put("number_of_shards", shardSplits[0])
103-
.put("index.number_of_routing_shards", routingShards);
104-
if (useRouting && useMixedRouting == false && randomBoolean()) {
105-
settings.put("index.routing_partition_size", randomIntBetween(1, routingShards - 1));
112+
Settings.Builder settings = Settings.builder().put(indexSettings()).put("number_of_shards", sourceShards);
113+
final boolean useRoutingPartition;
114+
if (randomBoolean()) {
115+
// randomly set the value manually
116+
int routingShards = secondSplitShards * randomIntBetween(1, 10);
117+
settings.put("index.number_of_routing_shards", routingShards);
118+
useRoutingPartition = false;
119+
} else {
120+
useRoutingPartition = randomBoolean();
121+
}
122+
if (useRouting && useMixedRouting == false && useRoutingPartition) {
123+
settings.put("index.routing_partition_size",
124+
randomIntBetween(1, MetaDataCreateIndexService.calculateNumRoutingShards(sourceShards, Version.CURRENT)-1));
106125
if (useNested) {
107126
createInitialIndex.addMapping("t1", "_routing", "required=true", "nested1", "type=nested");
108127
} else {
@@ -172,11 +191,15 @@ public void testCreateSplitIndexToN() throws IOException {
172191
.setSettings(Settings.builder()
173192
.put("index.blocks.write", true)).get();
174193
ensureGreen();
194+
Settings.Builder firstSplitSettingsBuilder = Settings.builder()
195+
.put("index.number_of_replicas", 0)
196+
.put("index.number_of_shards", firstSplitShards);
197+
if (sourceShards == 1 && useRoutingPartition == false && randomBoolean()) { // try to set it if we have a source index with 1 shard
198+
firstSplitSettingsBuilder.put("index.number_of_routing_shards", secondSplitShards);
199+
}
175200
assertAcked(client().admin().indices().prepareResizeIndex("source", "first_split")
176201
.setResizeType(ResizeType.SPLIT)
177-
.setSettings(Settings.builder()
178-
.put("index.number_of_replicas", 0)
179-
.put("index.number_of_shards", shardSplits[1]).build()).get());
202+
.setSettings(firstSplitSettingsBuilder.build()).get());
180203
ensureGreen();
181204
assertHitCount(client().prepareSearch("first_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs);
182205

@@ -204,7 +227,7 @@ public void testCreateSplitIndexToN() throws IOException {
204227
.setResizeType(ResizeType.SPLIT)
205228
.setSettings(Settings.builder()
206229
.put("index.number_of_replicas", 0)
207-
.put("index.number_of_shards", shardSplits[2]).build()).get());
230+
.put("index.number_of_shards", secondSplitShards).build()).get());
208231
ensureGreen();
209232
assertHitCount(client().prepareSearch("second_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs);
210233
// let it be allocated anywhere and bump replicas
@@ -340,7 +363,6 @@ public void testCreateSplitIndex() {
340363
prepareCreate("source").setSettings(Settings.builder().put(indexSettings())
341364
.put("number_of_shards", 1)
342365
.put("index.version.created", version)
343-
.put("index.number_of_routing_shards", 2)
344366
).get();
345367
final int docs = randomIntBetween(0, 128);
346368
for (int i = 0; i < docs; i++) {
@@ -443,7 +465,6 @@ public void testCreateSplitWithIndexSort() throws Exception {
443465
Settings.builder()
444466
.put(indexSettings())
445467
.put("sort.field", "id")
446-
.put("index.number_of_routing_shards", 16)
447468
.put("sort.order", "desc")
448469
.put("number_of_shards", 2)
449470
.put("number_of_replicas", 0)

0 commit comments

Comments
 (0)