Skip to content

Automatically prepare indices for splitting #27451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Nov 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
005215e
Automatically prepare indices for splitting
s1monw Nov 16, 2017
4072379
prevent automatic num routing shards for pre 7.0 indices
s1monw Nov 20, 2017
6ab582e
fix auto routing shards for 1 shard source indices
s1monw Nov 20, 2017
0e3fbbb
Merge branch 'master' into prepare_for_split
s1monw Nov 20, 2017
6d90ad8
add additional verification for broken split setups
s1monw Nov 20, 2017
e4da795
fix test:
s1monw Nov 21, 2017
033c6ad
Merge branch 'master' into prepare_for_split
s1monw Nov 21, 2017
fb13969
Improved index-split docs
clintongormley Nov 21, 2017
27a6d34
apply review comments
s1monw Nov 21, 2017
71f5c35
Merge pull request #3 from clintongormley/prepare_for_split
s1monw Nov 21, 2017
9262c9e
fix unittest
s1monw Nov 21, 2017
8482a60
stabelize SplitIndexIT
s1monw Nov 21, 2017
6fe2bff
Fix routing test to actually be sane
s1monw Nov 21, 2017
b02b9f3
fix SharedSignificantTermsTestMethods tests
s1monw Nov 21, 2017
8176378
fix SimpleRoutingIT again
s1monw Nov 22, 2017
1f46ce3
use a factor but incompatible one
s1monw Nov 22, 2017
a6616cd
bound num routing shards to 1024
s1monw Nov 22, 2017
b2a9a08
fix comments
s1monw Nov 22, 2017
dccad0b
fix tests for new hashing
s1monw Nov 22, 2017
36aca55
Merge branch 'master' into prepare_for_split
s1monw Nov 22, 2017
1cd9c78
Merge branch 'master' into prepare_for_split
s1monw Nov 22, 2017
e820289
Merge branch 'master' into prepare_for_split
s1monw Nov 22, 2017
791c2f1
Merge branch 'master' into prepare_for_split
s1monw Nov 23, 2017
4db1b97
add note to migration guide
s1monw Nov 23, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public void testSearchWithMatrixStats() throws IOException {
assertEquals(5, matrixStats.getFieldCount("num"));
assertEquals(56d, matrixStats.getMean("num"), 0d);
assertEquals(1830d, matrixStats.getVariance("num"), 0d);
assertEquals(0.09340198804973057, matrixStats.getSkewness("num"), 0d);
assertEquals(0.09340198804973046, matrixStats.getSkewness("num"), 0d);
assertEquals(1.2741646510794589, matrixStats.getKurtosis("num"), 0d);
assertEquals(5, matrixStats.getFieldCount("num2"));
assertEquals(29d, matrixStats.getMean("num2"), 0d);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,14 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final Resi

if (IndexMetaData.INDEX_ROUTING_PARTITION_SIZE_SETTING.exists(targetIndexSettings)) {
throw new IllegalArgumentException("cannot provide a routing partition size value when resizing an index");

}
if (IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(targetIndexSettings)) {
throw new IllegalArgumentException("cannot provide index.number_of_routing_shards on resize");
// if we have a source index with 1 shards it's legal to set this
final boolean splitFromSingleShards = resizeRequest.getResizeType() == ResizeType.SPLIT && metaData.getNumberOfShards() == 1;
if (splitFromSingleShards == false) {
throw new IllegalArgumentException("cannot provide index.number_of_routing_shards on resize");
}
}
String cause = resizeRequest.getResizeType().name().toLowerCase(Locale.ROOT) + "_index";
targetIndex.cause(cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1333,25 +1333,33 @@ public int getRoutingFactor() {
* @return a the source shard ID to split off from
*/
public static ShardId selectSplitShard(int shardId, IndexMetaData sourceIndexMetadata, int numTargetShards) {
int numSourceShards = sourceIndexMetadata.getNumberOfShards();
if (shardId >= numTargetShards) {
throw new IllegalArgumentException("the number of target shards (" + numTargetShards + ") must be greater than the shard id: "
+ shardId);
}
int numSourceShards = sourceIndexMetadata.getNumberOfShards();
final int routingFactor = getRoutingFactor(numSourceShards, numTargetShards);
assertSplitMetadata(numSourceShards, numTargetShards, sourceIndexMetadata);
return new ShardId(sourceIndexMetadata.getIndex(), shardId/routingFactor);
}

private static void assertSplitMetadata(int numSourceShards, int numTargetShards, IndexMetaData sourceIndexMetadata) {
if (numSourceShards > numTargetShards) {
throw new IllegalArgumentException("the number of source shards [" + numSourceShards
+ "] must be less that the number of target shards [" + numTargetShards + "]");
+ "] must be less that the number of target shards [" + numTargetShards + "]");
}
int routingFactor = getRoutingFactor(numSourceShards, numTargetShards);
// now we verify that the numRoutingShards is valid in the source index
int routingNumShards = sourceIndexMetadata.getRoutingNumShards();
// note: if the number of shards is 1 in the source index we can just assume it's correct since from 1 we can split into anything
// this is important to special case here since we use this to validate this in various places in the code but allow to split form
// 1 to N but we never modify the sourceIndexMetadata to accommodate for that
int routingNumShards = numSourceShards == 1 ? numTargetShards : sourceIndexMetadata.getRoutingNumShards();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe do something like:

final int selectedShard;
if (numSourceShards == 1) {
  selectedShard = 0;
} else {
  ... the current logic...
  selectedShard = shardId/routingFactor;
}
return new ShardId(sourceIndexMetadata.getIndex(), selectedShard);

will be easier to read , I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also s/form/from/

if (routingNumShards % numTargetShards != 0) {
throw new IllegalStateException("the number of routing shards ["
+ routingNumShards + "] must be a multiple of the target shards [" + numTargetShards + "]");
}
// this is just an additional assertion that ensures we are a factor of the routing num shards.
assert getRoutingFactor(numTargetShards, sourceIndexMetadata.getRoutingNumShards()) >= 0;
return new ShardId(sourceIndexMetadata.getIndex(), shardId/routingFactor);
assert sourceIndexMetadata.getNumberOfShards() == 1 // special case - we can split into anything from 1 shard
|| getRoutingFactor(numTargetShards, routingNumShards) >= 0;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,15 +379,24 @@ public ClusterState execute(ClusterState currentState) throws Exception {
indexSettingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getProvidedName());
indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID());
final IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index());

final Settings idxSettings = indexSettingsBuilder.build();
int numTargetShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(idxSettings);
final int routingNumShards;
if (recoverFromIndex == null) {
Settings idxSettings = indexSettingsBuilder.build();
routingNumShards = IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.get(idxSettings);
final Version indexVersionCreated = idxSettings.getAsVersion(IndexMetaData.SETTING_VERSION_CREATED, null);
final IndexMetaData sourceMetaData = recoverFromIndex == null ? null :
currentState.metaData().getIndexSafe(recoverFromIndex);
if (sourceMetaData == null || sourceMetaData.getNumberOfShards() == 1) {
// in this case we either have no index to recover from or
// we have a source index with 1 shard and without an explicit split factor
// or one that is valid in that case we can split into whatever and auto-generate a new factor.
if (IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(idxSettings)) {
routingNumShards = IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.get(idxSettings);
} else {
routingNumShards = calculateNumRoutingShards(numTargetShards, indexVersionCreated);
}
} else {
assert IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(indexSettingsBuilder.build()) == false
: "index.number_of_routing_shards should be present on the target index on resize";
final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex);
: "index.number_of_routing_shards should not be present on the target index on resize";
routingNumShards = sourceMetaData.getRoutingNumShards();
}
// remove the setting it's temporary and is only relevant once we create the index
Expand All @@ -408,7 +417,6 @@ public ClusterState execute(ClusterState currentState) throws Exception {
* the maximum primary term on all the shards in the source index. This ensures that we have correct
* document-level semantics regarding sequence numbers in the shrunken index.
*/
final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex);
final long primaryTerm =
IntStream
.range(0, sourceMetaData.getNumberOfShards())
Expand Down Expand Up @@ -717,4 +725,27 @@ static void prepareResizeIndexSettings(ClusterState currentState, Set<String> ma
.put(IndexMetaData.INDEX_RESIZE_SOURCE_NAME.getKey(), resizeSourceIndex.getName())
.put(IndexMetaData.INDEX_RESIZE_SOURCE_UUID.getKey(), resizeSourceIndex.getUUID());
}

/**
* Returns a default number of routing shards based on the number of shards of the index. The default number of routing shards will
* allow any index to be split at least once and at most 10 times by a factor of two. The closer the number or shards gets to 1024
* the less default split operations are supported
*/
public static int calculateNumRoutingShards(int numShards, Version indexVersionCreated) {
if (indexVersionCreated.onOrAfter(Version.V_7_0_0_alpha1)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify why this needs to be version dependent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a comment in the line below?!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I get this means we only do the new behavior until the cluster is fully upgraded, but I don't see why we care? I mean, if the master is a 7.0.0 master, we can start creating indices with a different hashing logic and not worry about it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's mainly for testing purposes and BWC behavior being more predictable otherwise some rest tests will randomly fail

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh well :)

// only select this automatically for indices that are created on or after 7.0 this will prevent this new behaviour
// until we have a fully upgraded cluster. Additionally it will make integratin testing easier since mixed clusters
// will always have the behavior of the min node in the cluster.
//
// We use as a default number of routing shards the higher number that can be expressed
// as {@code numShards * 2^x`} that is less than or equal to the maximum number of shards: 1024.
int log2MaxNumShards = 10; // logBase2(1024)
int log2NumShards = 32 - Integer.numberOfLeadingZeros(numShards - 1); // ceil(logBase2(numShards))
int numSplits = log2MaxNumShards - log2NumShards;
numSplits = Math.max(1, numSplits); // Ensure the index can be split at least once
return numShards * 1 << numSplits;
} else {
return numShards;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ public void testInvalidPartitionSize() {
response = prepareCreate("test_" + shards + "_" + partitionSize)
.setSettings(Settings.builder()
.put("index.number_of_shards", shards)
.put("index.number_of_routing_shards", shards)
.put("index.routing_partition_size", partitionSize))
.execute().actionGet();
} catch (IllegalStateException | IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -66,7 +67,6 @@
import java.util.List;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.IntFunction;
import java.util.stream.IntStream;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand All @@ -88,21 +88,40 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

public void testCreateSplitIndexToN() throws IOException {
int[][] possibleShardSplits = new int[][] {{2,4,8}, {3, 6, 12}, {1, 2, 4}};
int[][] possibleShardSplits = new int[][]{{2, 4, 8}, {3, 6, 12}, {1, 2, 4}};
int[] shardSplits = randomFrom(possibleShardSplits);
assertEquals(shardSplits[0], (shardSplits[0] * shardSplits[1]) / shardSplits[1]);
assertEquals(shardSplits[1], (shardSplits[1] * shardSplits[2]) / shardSplits[2]);
splitToN(shardSplits[0], shardSplits[1], shardSplits[2]);
}

public void testSplitFromOneToN() {
splitToN(1, 5, 10);
client().admin().indices().prepareDelete("*").get();
int randomSplit = randomIntBetween(2, 6);
splitToN(1, randomSplit, randomSplit * 2);
}

private void splitToN(int sourceShards, int firstSplitShards, int secondSplitShards) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😍


assertEquals(sourceShards, (sourceShards * firstSplitShards) / firstSplitShards);
assertEquals(firstSplitShards, (firstSplitShards * secondSplitShards) / secondSplitShards);
internalCluster().ensureAtLeastNumDataNodes(2);
final boolean useRouting = randomBoolean();
final boolean useNested = randomBoolean();
final boolean useMixedRouting = useRouting ? randomBoolean() : false;
CreateIndexRequestBuilder createInitialIndex = prepareCreate("source");
final int routingShards = shardSplits[2] * randomIntBetween(1, 10);
Settings.Builder settings = Settings.builder().put(indexSettings())
.put("number_of_shards", shardSplits[0])
.put("index.number_of_routing_shards", routingShards);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we randomly still do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can.. I will do it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

if (useRouting && useMixedRouting == false && randomBoolean()) {
settings.put("index.routing_partition_size", randomIntBetween(1, routingShards - 1));
Settings.Builder settings = Settings.builder().put(indexSettings()).put("number_of_shards", sourceShards);
final boolean useRoutingPartition;
if (randomBoolean()) {
// randomly set the value manually
int routingShards = secondSplitShards * randomIntBetween(1, 10);
settings.put("index.number_of_routing_shards", routingShards);
useRoutingPartition = false;
} else {
useRoutingPartition = randomBoolean();
}
if (useRouting && useMixedRouting == false && useRoutingPartition) {
settings.put("index.routing_partition_size",
randomIntBetween(1, MetaDataCreateIndexService.calculateNumRoutingShards(sourceShards, Version.CURRENT)-1));
if (useNested) {
createInitialIndex.addMapping("t1", "_routing", "required=true", "nested1", "type=nested");
} else {
Expand Down Expand Up @@ -172,11 +191,15 @@ public void testCreateSplitIndexToN() throws IOException {
.setSettings(Settings.builder()
.put("index.blocks.write", true)).get();
ensureGreen();
Settings.Builder firstSplitSettingsBuilder = Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", firstSplitShards);
if (sourceShards == 1 && useRoutingPartition == false && randomBoolean()) { // try to set it if we have a source index with 1 shard
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

firstSplitSettingsBuilder.put("index.number_of_routing_shards", secondSplitShards);
}
assertAcked(client().admin().indices().prepareResizeIndex("source", "first_split")
.setResizeType(ResizeType.SPLIT)
.setSettings(Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", shardSplits[1]).build()).get());
.setSettings(firstSplitSettingsBuilder.build()).get());
ensureGreen();
assertHitCount(client().prepareSearch("first_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs);

Expand Down Expand Up @@ -204,7 +227,7 @@ public void testCreateSplitIndexToN() throws IOException {
.setResizeType(ResizeType.SPLIT)
.setSettings(Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", shardSplits[2]).build()).get());
.put("index.number_of_shards", secondSplitShards).build()).get());
ensureGreen();
assertHitCount(client().prepareSearch("second_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs);
// let it be allocated anywhere and bump replicas
Expand Down Expand Up @@ -340,7 +363,6 @@ public void testCreateSplitIndex() {
prepareCreate("source").setSettings(Settings.builder().put(indexSettings())
.put("number_of_shards", 1)
.put("index.version.created", version)
.put("index.number_of_routing_shards", 2)
).get();
final int docs = randomIntBetween(0, 128);
for (int i = 0; i < docs; i++) {
Expand Down Expand Up @@ -443,7 +465,6 @@ public void testCreateSplitWithIndexSort() throws Exception {
Settings.builder()
.put(indexSettings())
.put("sort.field", "id")
.put("index.number_of_routing_shards", 16)
.put("sort.order", "desc")
.put("number_of_shards", 2)
.put("number_of_replicas", 0)
Expand Down
Loading