Skip to content

[8.7] Simulate shard moves using cluster_concurrent_rebalance=2 (#93977) #94082

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/93977.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 93977
summary: Simulate moves using cluster_concurrent_rebalance=2
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,17 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca

@Override
public Decision canRebalance(RoutingAllocation allocation) {
int relocatingShards = allocation.routingNodes().getRelocatingShardCount();
if (allocation.isSimulating() && relocatingShards >= 2) {
// BalancedShardAllocator is prone to perform unnecessary moves when cluster_concurrent_rebalance is set to high values (>2).
// (See https://github.com/elastic/elasticsearch/issues/87279)
// Above allocator is used in DesiredBalanceComputer. Since we do not move actual shard data during calculation
// it is possible to artificially set above setting to 2 to avoid unnecessary moves in desired balance.
return allocation.decision(Decision.THROTTLE, NAME, "allocation should move one shard at the time when simulating");
}
if (clusterConcurrentRebalance == -1) {
return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed");
}
int relocatingShards = allocation.routingNodes().getRelocatingShardCount();
if (relocatingShards >= clusterConcurrentRebalance) {
return allocation.decision(
Decision.THROTTLE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,27 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.TestShardRoutingRoleStrategies;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;

import java.util.Set;
import java.util.function.IntFunction;

import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
Expand Down Expand Up @@ -440,4 +451,82 @@ public void testBalanceAllNodesStartedAddIndex() {
assertThat(routingNodes.node("node2").shardsWithState("test1", STARTED).count(), equalTo(2L));
assertThat(routingNodes.node("node3").shardsWithState("test1", STARTED).count(), equalTo(2L));
}

/**
* {@see https://github.com/elastic/elasticsearch/issues/87279}
*/
public void testRebalanceShouldNotPerformUnnecessaryMovesWithMultipleConcurrentRebalances() {
final var settings = Settings.builder()
.put("cluster.routing.allocation.type", "desired_balance")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", randomIntBetween(3, 9))
.build();

final var allocationService = createAllocationService(settings);

assertTrue(
"Only fixed in DesiredBalanceShardsAllocator",
allocationService.shardsAllocator instanceof DesiredBalanceShardsAllocator
);

final var discoveryNodesBuilder = DiscoveryNodes.builder();
for (int nodeIndex = 0; nodeIndex < 3; nodeIndex++) {
discoveryNodesBuilder.add(newNode("node-" + nodeIndex));
}

final var metadataBuilder = Metadata.builder();
final var routingTableBuilder = RoutingTable.builder();

addIndex(metadataBuilder, routingTableBuilder, "index-0", 4, shardId -> "node-" + (shardId / 2));
addIndex(metadataBuilder, routingTableBuilder, "index-1", 4, shardId -> "node-" + (shardId / 2));
addIndex(metadataBuilder, routingTableBuilder, "index-2", 1, shardId -> "node-2");

final var clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(discoveryNodesBuilder)
.metadata(metadataBuilder)
.routingTable(routingTableBuilder)
.build();

final var reroutedState = applyStartedShardsUntilNoChange(clusterState, allocationService);
// node-0 and node-1 has 4 shards each. node-2 has only [index-2][0]. It should not be moved to achieve even balance.
assertThat(reroutedState.getRoutingTable().index("index-2").shard(0).primaryShard().currentNodeId(), equalTo("node-2"));
}

private static void addIndex(
Metadata.Builder metadataBuilder,
RoutingTable.Builder routingTableBuilder,
String indexName,
int numberOfShards,
IntFunction<String> assignmentFunction
) {
final var inSyncIds = randomList(numberOfShards, numberOfShards, () -> UUIDs.randomBase64UUID(random()));
final var indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(
Settings.builder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.build()
);
for (int shardId = 0; shardId < numberOfShards; shardId++) {
indexMetadataBuilder.putInSyncAllocationIds(shardId, Set.of(inSyncIds.get(shardId)));
}
metadataBuilder.put(indexMetadataBuilder);
final var indexId = metadataBuilder.get(indexName).getIndex();
final var indexRoutingTableBuilder = IndexRoutingTable.builder(indexId);

for (int shardId = 0; shardId < numberOfShards; shardId++) {
indexRoutingTableBuilder.addShard(
TestShardRouting.newShardRouting(
new ShardId(indexId, shardId),
assignmentFunction.apply(shardId),
null,
true,
ShardRoutingState.STARTED,
AllocationId.newInitializing(inSyncIds.get(shardId))
)
);
}

routingTableBuilder.add(indexRoutingTableBuilder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static java.util.Collections.emptyMap;
import static org.elasticsearch.cluster.ClusterModule.BALANCED_ALLOCATOR;
import static org.elasticsearch.cluster.ClusterModule.DESIRED_BALANCE_ALLOCATOR;
import static org.elasticsearch.cluster.ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;

Expand Down Expand Up @@ -98,14 +99,20 @@ public static MockAllocationService createAllocationService(Settings settings, C
);
}

private static ShardsAllocator createShardsAllocator(Settings settings) {
return switch (randomFrom(BALANCED_ALLOCATOR, DESIRED_BALANCE_ALLOCATOR)) {
protected static ShardsAllocator createShardsAllocator(Settings settings) {
return switch (pickShardsAllocator(settings)) {
case BALANCED_ALLOCATOR -> new BalancedShardsAllocator(settings);
case DESIRED_BALANCE_ALLOCATOR -> createDesiredBalanceShardsAllocator(settings);
default -> throw new AssertionError("Unknown allocator");
};
}

private static String pickShardsAllocator(Settings settings) {
return SHARDS_ALLOCATOR_TYPE_SETTING.exists(settings)
? SHARDS_ALLOCATOR_TYPE_SETTING.get(settings)
: randomFrom(BALANCED_ALLOCATOR, DESIRED_BALANCE_ALLOCATOR);
}

private static DesiredBalanceShardsAllocator createDesiredBalanceShardsAllocator(Settings settings) {
var queue = new DeterministicTaskQueue();
return new DesiredBalanceShardsAllocator(
Expand Down Expand Up @@ -337,7 +344,8 @@ protected static class MockAllocationService extends AllocationService {

private volatile long nanoTimeOverride = -1L;

private final GatewayAllocator gatewayAllocator;
public final GatewayAllocator gatewayAllocator;
public final ShardsAllocator shardsAllocator;

public MockAllocationService(
AllocationDeciders allocationDeciders,
Expand All @@ -355,6 +363,7 @@ public MockAllocationService(
TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY
);
this.gatewayAllocator = gatewayAllocator;
this.shardsAllocator = shardsAllocator;
}

public void setNanoTimeOverride(long nanoTime) {
Expand All @@ -365,10 +374,6 @@ public void setNanoTimeOverride(long nanoTime) {
protected long currentNanoTime() {
return nanoTimeOverride == -1L ? super.currentNanoTime() : nanoTimeOverride;
}

public GatewayAllocator getGatewayAllocator() {
return gatewayAllocator;
}
}

/**
Expand Down