Skip to content

Handle negative free disk space in deciders #48392

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,19 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
// Cache the used disk percentage for displaying disk percentages consistent with documentation
double usedDiskPercentage = usage.getUsedDiskAsPercentage();
long freeBytes = usage.getFreeBytes();
if (freeBytes < 0L) {
final long sizeOfRelocatingShards = sizeOfRelocatingShards(node, false, usage.getPath(),
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
logger.debug("fewer free bytes remaining than the size of all incoming shards: " +
"usage {} on node {} including {} bytes of relocations, preventing allocation",
usage, node.nodeId(), sizeOfRelocatingShards);

return allocation.decision(Decision.NO, NAME,
"the node has fewer free bytes remaining than the total size of all incoming shards: " +
"free space [%sB], relocating shards [%sB]",
freeBytes + sizeOfRelocatingShards, sizeOfRelocatingShards);
}

ByteSizeValue freeBytesValue = new ByteSizeValue(freeBytes);
if (logger.isTraceEnabled()) {
logger.trace("node [{}] has {}% used disk", node.nodeId(), usedDiskPercentage);
Expand Down Expand Up @@ -242,6 +255,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
// Secondly, check that allocating the shard to this node doesn't put it above the high watermark
final long shardSize = getExpectedShardSize(shardRouting, 0L,
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
assert shardSize >= 0 : shardSize;
double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize);
long freeBytesAfterShard = freeBytes - shardSize;
if (freeBytesAfterShard < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
Expand All @@ -268,6 +282,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
diskThresholdSettings.getHighWatermarkRaw(), usedDiskThresholdHigh, freeSpaceAfterShard);
}

assert freeBytesAfterShard >= 0 : freeBytesAfterShard;
return allocation.decision(Decision.YES, NAME,
"enough disk for shard on node, free: [%s], shard size: [%s], free after allocating shard: [%s]",
freeBytesValue,
Expand Down Expand Up @@ -301,6 +316,17 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
return allocation.decision(Decision.YES, NAME,
"this shard is not allocated on the most utilized disk and can remain");
}
if (freeBytes < 0L) {
final long sizeOfRelocatingShards = sizeOfRelocatingShards(node, false, usage.getPath(),
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
logger.debug("fewer free bytes remaining than the size of all incoming shards: " +
"usage {} on node {} including {} bytes of relocations, shard cannot remain",
usage, node.nodeId(), sizeOfRelocatingShards);
return allocation.decision(Decision.NO, NAME,
"the shard cannot remain on this node because the node has fewer free bytes remaining than the total size of all " +
"incoming shards: free space [%s], relocating shards [%s]",
freeBytes + sizeOfRelocatingShards, sizeOfRelocatingShards);
}
if (freeBytes < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
if (logger.isDebugEnabled()) {
logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -653,18 +655,19 @@ public void testShardRelocationsTakenIntoAccount() {
final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);

DiskThresholdDecider decider = makeDecider(diskSettings);
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
AllocationDeciders deciders = new AllocationDeciders(
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(
Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
), decider)));
new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
new EnableAllocationDecider(
Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none").build(), clusterSettings),
decider)));

ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
};
final AtomicReference<ClusterInfo> clusterInfoReference = new AtomicReference<>(clusterInfo);
final ClusterInfoService cis = clusterInfoReference::get;

AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY), cis);
new BalancedShardsAllocator(Settings.EMPTY), cis);

MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
Expand Down Expand Up @@ -702,30 +705,66 @@ Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLU
.add(newNode("node3"))
).build();

AllocationCommand relocate1 = new MoveAllocationCommand("test", 0, "node2", "node3");
AllocationCommands cmds = new AllocationCommands(relocate1);
{
AllocationCommand moveAllocationCommand = new MoveAllocationCommand("test", 0, "node2", "node3");
AllocationCommands cmds = new AllocationCommands(moveAllocationCommand);

clusterState = strategy.reroute(clusterState, cmds, false, false).getClusterState();
logShardStates(clusterState);
clusterState = strategy.reroute(clusterState, cmds, false, false).getClusterState();
logShardStates(clusterState);
}

final ImmutableOpenMap.Builder<String, DiskUsage> overfullUsagesBuilder = ImmutableOpenMap.builder();
overfullUsagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 40)); // 60% used
overfullUsagesBuilder.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 40)); // 60% used
overfullUsagesBuilder.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 0)); // 100% used
final ImmutableOpenMap<String, DiskUsage> overfullUsages = overfullUsagesBuilder.build();

final ImmutableOpenMap.Builder<String, Long> largerShardSizesBuilder = ImmutableOpenMap.builder();
largerShardSizesBuilder.put("[test][0][p]", 14L);
largerShardSizesBuilder.put("[test][0][r]", 14L);
largerShardSizesBuilder.put("[test2][0][p]", 2L);
largerShardSizesBuilder.put("[test2][0][r]", 2L);
final ImmutableOpenMap<String, Long> largerShardSizes = largerShardSizesBuilder.build();

AllocationCommand relocate2 = new MoveAllocationCommand("test2", 0, "node2", "node3");
cmds = new AllocationCommands(relocate2);

try {
// The shard for the "test" index is already being relocated to
// node3, which will put it over the low watermark when it
// completes, with shard relocations taken into account this should
// throw an exception about not being able to complete
strategy.reroute(clusterState, cmds, false, false);
fail("should not have been able to reroute the shard");
} catch (IllegalArgumentException e) {
assertThat("can't be allocated because there isn't enough room: " + e.getMessage(),
e.getMessage(),
containsString("the node is above the low watermark cluster setting " +
"[cluster.routing.allocation.disk.watermark.low=0.7], using more disk space than the maximum " +
"allowed [70.0%], actual free: [26.0%]"));
final ClusterInfo overfullClusterInfo = new DevNullClusterInfo(overfullUsages, overfullUsages, largerShardSizes);

{
AllocationCommand moveAllocationCommand = new MoveAllocationCommand("test2", 0, "node2", "node3");
AllocationCommands cmds = new AllocationCommands(moveAllocationCommand);

final ClusterState clusterStateThatRejectsCommands = clusterState;

assertThat(expectThrows(IllegalArgumentException.class,
() -> strategy.reroute(clusterStateThatRejectsCommands, cmds, false, false)).getMessage(),
containsString("the node is above the low watermark cluster setting " +
"[cluster.routing.allocation.disk.watermark.low=0.7], using more disk space than the maximum " +
"allowed [70.0%], actual free: [26.0%]"));

clusterInfoReference.set(overfullClusterInfo);

assertThat(expectThrows(IllegalArgumentException.class,
() -> strategy.reroute(clusterStateThatRejectsCommands, cmds, false, false)).getMessage(),
containsString("the node has fewer free bytes remaining than the total size of all incoming shards"));

clusterInfoReference.set(clusterInfo);
}

{
AllocationCommand moveAllocationCommand = new MoveAllocationCommand("test2", 0, "node2", "node3");
AllocationCommands cmds = new AllocationCommands(moveAllocationCommand);

logger.info("--> before starting: {}", clusterState);
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
logger.info("--> after starting: {}", clusterState);
clusterState = strategy.reroute(clusterState, cmds, false, false).getClusterState();
logger.info("--> after running another command: {}", clusterState);
logShardStates(clusterState);

clusterInfoReference.set(overfullClusterInfo);

clusterState = strategy.reroute(clusterState, "foo");
logger.info("--> after another reroute: {}", clusterState);
}
}

public void testCanRemainWithShardRelocatingAway() {
Expand Down