Skip to content

Commit b89bf9d

Browse files
committed
Handle negative free disk space in deciders (#48392)
Today it is possible that the total size of all relocating shards exceeds the total amount of free disk space. For instance, this may be caused by another user of the same disk increasing their disk usage, or may be due to how Elasticsearch double-counts relocations that are nearly complete particularly if there are many concurrent relocations in progress. The `DiskThresholdDecider` treats negative free space similarly to zero free space, but it then fails when rendering the messages that explain its decision. This commit fixes its handling of negative free space. Fixes #48380
1 parent 451bcdc commit b89bf9d

File tree

3 files changed

+192
-44
lines changed

3 files changed

+192
-44
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java

+93-18
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.elasticsearch.cluster.routing.allocation.decider;
2121

22-
import java.util.Set;
23-
2422
import com.carrotsearch.hppc.cursors.ObjectCursor;
2523
import org.apache.logging.log4j.LogManager;
2624
import org.apache.logging.log4j.Logger;
@@ -42,6 +40,8 @@
4240
import org.elasticsearch.index.Index;
4341
import org.elasticsearch.index.shard.ShardId;
4442

43+
import java.util.Set;
44+
4545
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
4646
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
4747

@@ -138,12 +138,24 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
138138

139139
// subtractLeavingShards is passed as false here, because they still use disk space, and therefore should we should be extra careful
140140
// and take the size into account
141-
DiskUsage usage = getDiskUsage(node, allocation, usages, false);
141+
final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, false);
142142
// First, check that the node currently over the low watermark
143143
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
144144
// Cache the used disk percentage for displaying disk percentages consistent with documentation
145145
double usedDiskPercentage = usage.getUsedDiskAsPercentage();
146146
long freeBytes = usage.getFreeBytes();
147+
if (freeBytes < 0L) {
148+
final long sizeOfRelocatingShards = sizeOfRelocatingShards(node, allocation, false, usage.getPath());
149+
logger.debug("fewer free bytes remaining than the size of all incoming shards: " +
150+
"usage {} on node {} including {} bytes of relocations, preventing allocation",
151+
usage, node.nodeId(), sizeOfRelocatingShards);
152+
153+
return allocation.decision(Decision.NO, NAME,
154+
"the node has fewer free bytes remaining than the total size of all incoming shards: " +
155+
"free space [%sB], relocating shards [%sB]",
156+
freeBytes + sizeOfRelocatingShards, sizeOfRelocatingShards);
157+
}
158+
147159
ByteSizeValue freeBytesValue = new ByteSizeValue(freeBytes);
148160
if (logger.isTraceEnabled()) {
149161
logger.trace("node [{}] has {}% used disk", node.nodeId(), usedDiskPercentage);
@@ -240,6 +252,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
240252

241253
// Secondly, check that allocating the shard to this node doesn't put it above the high watermark
242254
final long shardSize = getExpectedShardSize(shardRouting, allocation, 0);
255+
assert shardSize >= 0 : shardSize;
243256
double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize);
244257
long freeBytesAfterShard = freeBytes - shardSize;
245258
if (freeBytesAfterShard < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
@@ -266,6 +279,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
266279
diskThresholdSettings.getHighWatermarkRaw(), usedDiskThresholdHigh, freeSpaceAfterShard);
267280
}
268281

282+
assert freeBytesAfterShard >= 0 : freeBytesAfterShard;
269283
return allocation.decision(Decision.YES, NAME,
270284
"enough disk for shard on node, free: [%s], shard size: [%s], free after allocating shard: [%s]",
271285
freeBytesValue,
@@ -287,7 +301,7 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
287301

288302
// subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk
289303
// since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check.
290-
final DiskUsage usage = getDiskUsage(node, allocation, usages, true);
304+
final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, true);
291305
final String dataPath = clusterInfo.getDataPath(shardRouting);
292306
// If this node is already above the high threshold, the shard cannot remain (get it off!)
293307
final double freeDiskPercentage = usage.getFreeDiskAsPercentage();
@@ -299,6 +313,16 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
299313
return allocation.decision(Decision.YES, NAME,
300314
"this shard is not allocated on the most utilized disk and can remain");
301315
}
316+
if (freeBytes < 0L) {
317+
final long sizeOfRelocatingShards = sizeOfRelocatingShards(node, allocation, true, usage.getPath());
318+
logger.debug("fewer free bytes remaining than the size of all incoming shards: " +
319+
"usage {} on node {} including {} bytes of relocations, shard cannot remain",
320+
usage, node.nodeId(), sizeOfRelocatingShards);
321+
return allocation.decision(Decision.NO, NAME,
322+
"the shard cannot remain on this node because the node has fewer free bytes remaining than the total size of all " +
323+
"incoming shards: free space [%s], relocating shards [%s]",
324+
freeBytes + sizeOfRelocatingShards, sizeOfRelocatingShards);
325+
}
302326
if (freeBytes < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
303327
if (logger.isDebugEnabled()) {
304328
logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain",
@@ -328,8 +352,8 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
328352
"there is enough disk on this node for the shard to remain, free: [%s]", new ByteSizeValue(freeBytes));
329353
}
330354

331-
private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation,
332-
ImmutableOpenMap<String, DiskUsage> usages, boolean subtractLeavingShards) {
355+
private DiskUsageWithRelocations getDiskUsage(RoutingNode node, RoutingAllocation allocation,
356+
ImmutableOpenMap<String, DiskUsage> usages, boolean subtractLeavingShards) {
333357
DiskUsage usage = usages.get(node.nodeId());
334358
if (usage == null) {
335359
// If there is no usage, and we have other nodes in the cluster,
@@ -340,18 +364,14 @@ private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation,
340364
node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeDiskAsPercentage());
341365
}
342366
}
343-
344-
if (diskThresholdSettings.includeRelocations()) {
345-
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, subtractLeavingShards, usage.getPath());
346-
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(),
347-
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
348-
if (logger.isTraceEnabled()) {
349-
logger.trace("usage without relocations: {}", usage);
350-
logger.trace("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations);
351-
}
352-
usage = usageIncludingRelocations;
367+
final DiskUsageWithRelocations diskUsageWithRelocations = new DiskUsageWithRelocations(usage,
368+
diskThresholdSettings.includeRelocations()
369+
? sizeOfRelocatingShards(node, allocation, subtractLeavingShards, usage.getPath()) : 0);
370+
if (logger.isTraceEnabled()) {
371+
logger.trace("getDiskUsage(subtractLeavingShards={}) returning {}", subtractLeavingShards, diskUsageWithRelocations);
353372
}
354-
return usage;
373+
374+
return diskUsageWithRelocations;
355375
}
356376

357377
/**
@@ -381,7 +401,7 @@ DiskUsage averageUsage(RoutingNode node, ImmutableOpenMap<String, DiskUsage> usa
381401
* @param shardSize Size in bytes of the shard
382402
* @return Percentage of free space after the shard is assigned to the node
383403
*/
384-
double freeDiskPercentageAfterShardAssigned(DiskUsage usage, Long shardSize) {
404+
double freeDiskPercentageAfterShardAssigned(DiskUsageWithRelocations usage, Long shardSize) {
385405
shardSize = (shardSize == null) ? 0 : shardSize;
386406
DiskUsage newUsage = new DiskUsage(usage.getNodeId(), usage.getNodeName(), usage.getPath(),
387407
usage.getTotalBytes(), usage.getFreeBytes() - shardSize);
@@ -450,4 +470,59 @@ public static long getExpectedShardSize(ShardRouting shard, RoutingAllocation al
450470
}
451471

452472
}
473+
474+
static class DiskUsageWithRelocations {
475+
476+
private final DiskUsage diskUsage;
477+
private final long relocatingShardSize;
478+
479+
DiskUsageWithRelocations(DiskUsage diskUsage, long relocatingShardSize) {
480+
this.diskUsage = diskUsage;
481+
this.relocatingShardSize = relocatingShardSize;
482+
}
483+
484+
@Override
485+
public String toString() {
486+
return "DiskUsageWithRelocations{" +
487+
"diskUsage=" + diskUsage +
488+
", relocatingShardSize=" + relocatingShardSize +
489+
'}';
490+
}
491+
492+
double getFreeDiskAsPercentage() {
493+
if (getTotalBytes() == 0L) {
494+
return 100.0;
495+
}
496+
return 100.0 * ((double)getFreeBytes() / getTotalBytes());
497+
}
498+
499+
double getUsedDiskAsPercentage() {
500+
return 100.0 - getFreeDiskAsPercentage();
501+
}
502+
503+
long getFreeBytes() {
504+
try {
505+
return Math.subtractExact(diskUsage.getFreeBytes(), relocatingShardSize);
506+
} catch (ArithmeticException e) {
507+
return Long.MAX_VALUE;
508+
}
509+
}
510+
511+
String getPath() {
512+
return diskUsage.getPath();
513+
}
514+
515+
String getNodeId() {
516+
return diskUsage.getNodeId();
517+
}
518+
519+
String getNodeName() {
520+
return diskUsage.getNodeName();
521+
}
522+
523+
long getTotalBytes() {
524+
return diskUsage.getTotalBytes();
525+
}
526+
}
527+
453528
}

server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java

+69-26
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,15 @@
5555
import java.util.HashMap;
5656
import java.util.HashSet;
5757
import java.util.Map;
58+
import java.util.concurrent.atomic.AtomicReference;
5859

5960
import static java.util.Collections.emptyMap;
6061
import static java.util.Collections.singleton;
6162
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
6263
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
6364
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
6465
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
66+
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
6567
import static org.hamcrest.Matchers.containsString;
6668
import static org.hamcrest.Matchers.equalTo;
6769
import static org.hamcrest.Matchers.nullValue;
@@ -645,7 +647,8 @@ public void testFreeDiskPercentageAfterShardAssigned() {
645647
usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 50)); // 50% used
646648
usages.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 0)); // 100% used
647649

648-
Double after = decider.freeDiskPercentageAfterShardAssigned(new DiskUsage("node2", "n2", "/dev/null", 100, 30), 11L);
650+
Double after = decider.freeDiskPercentageAfterShardAssigned(
651+
new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("node2", "n2", "/dev/null", 100, 30), 0L), 11L);
649652
assertThat(after, equalTo(19.0));
650653
}
651654

@@ -671,21 +674,25 @@ public void testShardRelocationsTakenIntoAccount() {
671674
final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
672675

673676
DiskThresholdDecider decider = makeDecider(diskSettings);
677+
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
674678
AllocationDeciders deciders = new AllocationDeciders(
675-
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(
676-
Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
677-
), decider)));
679+
new HashSet<>(Arrays.asList(
680+
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
681+
new EnableAllocationDecider(
682+
Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none").build(), clusterSettings),
683+
decider)));
678684

685+
final AtomicReference<ClusterInfo> clusterInfoReference = new AtomicReference<>(clusterInfo);
679686
ClusterInfoService cis = new ClusterInfoService() {
680687
@Override
681688
public ClusterInfo getClusterInfo() {
682689
logger.info("--> calling fake getClusterInfo");
683-
return clusterInfo;
690+
return clusterInfoReference.get();
684691
}
685692
};
686693

687694
AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
688-
new BalancedShardsAllocator(Settings.EMPTY), cis);
695+
new BalancedShardsAllocator(Settings.EMPTY), cis);
689696

690697
MetaData metaData = MetaData.builder()
691698
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
@@ -723,30 +730,66 @@ public ClusterInfo getClusterInfo() {
723730
.add(newNode("node3"))
724731
).build();
725732

726-
AllocationCommand relocate1 = new MoveAllocationCommand("test", 0, "node2", "node3");
727-
AllocationCommands cmds = new AllocationCommands(relocate1);
733+
{
734+
AllocationCommand moveAllocationCommand = new MoveAllocationCommand("test", 0, "node2", "node3");
735+
AllocationCommands cmds = new AllocationCommands(moveAllocationCommand);
728736

729-
clusterState = strategy.reroute(clusterState, cmds, false, false).getClusterState();
730-
logShardStates(clusterState);
737+
clusterState = strategy.reroute(clusterState, cmds, false, false).getClusterState();
738+
logShardStates(clusterState);
739+
}
740+
741+
final ImmutableOpenMap.Builder<String, DiskUsage> overfullUsagesBuilder = ImmutableOpenMap.builder();
742+
overfullUsagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 40)); // 60% used
743+
overfullUsagesBuilder.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 40)); // 60% used
744+
overfullUsagesBuilder.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 0)); // 100% used
745+
final ImmutableOpenMap<String, DiskUsage> overfullUsages = overfullUsagesBuilder.build();
746+
747+
final ImmutableOpenMap.Builder<String, Long> largerShardSizesBuilder = ImmutableOpenMap.builder();
748+
largerShardSizesBuilder.put("[test][0][p]", 14L);
749+
largerShardSizesBuilder.put("[test][0][r]", 14L);
750+
largerShardSizesBuilder.put("[test2][0][p]", 2L);
751+
largerShardSizesBuilder.put("[test2][0][r]", 2L);
752+
final ImmutableOpenMap<String, Long> largerShardSizes = largerShardSizesBuilder.build();
753+
754+
final ClusterInfo overfullClusterInfo = new DevNullClusterInfo(overfullUsages, overfullUsages, largerShardSizes);
731755

732-
AllocationCommand relocate2 = new MoveAllocationCommand("test2", 0, "node2", "node3");
733-
cmds = new AllocationCommands(relocate2);
734-
735-
try {
736-
// The shard for the "test" index is already being relocated to
737-
// node3, which will put it over the low watermark when it
738-
// completes, with shard relocations taken into account this should
739-
// throw an exception about not being able to complete
740-
strategy.reroute(clusterState, cmds, false, false);
741-
fail("should not have been able to reroute the shard");
742-
} catch (IllegalArgumentException e) {
743-
assertThat("can't be allocated because there isn't enough room: " + e.getMessage(),
744-
e.getMessage(),
745-
containsString("the node is above the low watermark cluster setting " +
746-
"[cluster.routing.allocation.disk.watermark.low=0.7], using more disk space than the maximum " +
747-
"allowed [70.0%], actual free: [26.0%]"));
756+
{
757+
AllocationCommand moveAllocationCommand = new MoveAllocationCommand("test2", 0, "node2", "node3");
758+
AllocationCommands cmds = new AllocationCommands(moveAllocationCommand);
759+
760+
final ClusterState clusterStateThatRejectsCommands = clusterState;
761+
762+
assertThat(expectThrows(IllegalArgumentException.class,
763+
() -> strategy.reroute(clusterStateThatRejectsCommands, cmds, false, false)).getMessage(),
764+
containsString("the node is above the low watermark cluster setting " +
765+
"[cluster.routing.allocation.disk.watermark.low=0.7], using more disk space than the maximum " +
766+
"allowed [70.0%], actual free: [26.0%]"));
767+
768+
clusterInfoReference.set(overfullClusterInfo);
769+
770+
assertThat(expectThrows(IllegalArgumentException.class,
771+
() -> strategy.reroute(clusterStateThatRejectsCommands, cmds, false, false)).getMessage(),
772+
containsString("the node has fewer free bytes remaining than the total size of all incoming shards"));
773+
774+
clusterInfoReference.set(clusterInfo);
748775
}
749776

777+
{
778+
AllocationCommand moveAllocationCommand = new MoveAllocationCommand("test2", 0, "node2", "node3");
779+
AllocationCommands cmds = new AllocationCommands(moveAllocationCommand);
780+
781+
logger.info("--> before starting: {}", clusterState);
782+
clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
783+
logger.info("--> after starting: {}", clusterState);
784+
clusterState = strategy.reroute(clusterState, cmds, false, false).getClusterState();
785+
logger.info("--> after running another command: {}", clusterState);
786+
logShardStates(clusterState);
787+
788+
clusterInfoReference.set(overfullClusterInfo);
789+
790+
clusterState = strategy.reroute(clusterState, "foo");
791+
logger.info("--> after another reroute: {}", clusterState);
792+
}
750793
}
751794

752795
public void testCanRemainWithShardRelocatingAway() {

0 commit comments

Comments
 (0)