Skip to content

Commit 4185566

Browse files
committed
Add option to take currently relocating shards' sizes into account
When using the DiskThresholdDecider, it's possible that shards could already be marked as relocating to the node being evaluated. This commit adds a new setting `cluster.routing.allocation.disk.include_relocations` which adds the size of the shards currently being relocated to this node to the node's used disk space. This new option defaults to `true`, however it's possible to over-estimate the usage for a node if the relocation is already partially complete, for instance: A node with a 10gb shard that's 45% of the way through a relocation would add 10gb + (.45 * 10) = 14.5gb to the node's disk usage before examining the watermarks to see if a new shard can be allocated. Fixes #7753 Relates to #6168
1 parent 61c21f9 commit 4185566

File tree

4 files changed

+166
-2
lines changed

4 files changed

+166
-2
lines changed

docs/reference/index-modules/allocation.asciidoc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,3 +137,12 @@ Both watermark settings can be changed dynamically using the cluster
137137
settings API. By default, Elasticsearch will retrieve information
138138
about the disk usage of the nodes every 30 seconds. This can also be
139139
changed by setting the `cluster.info.update.interval` setting.
140+
141+
By default, Elasticsearch will take into account shards that are currently being
142+
relocated to the target node when computing a node's disk usage. This can be
143+
changed by setting the `cluster.routing.allocation.disk.include_relocations`
144+
setting to `false` (defaults to `true`). Taking relocating shards' sizes into
145+
account may, however, mean that the disk usage for a node is incorrectly
146+
estimated on the high side, since the relocation could be 90% complete and a
147+
recently retrieved disk usage would include the total size of the relocating
148+
shard as well as the space already used by the running relocation.

src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@
2424
import org.elasticsearch.cluster.DiskUsage;
2525
import org.elasticsearch.cluster.routing.RoutingNode;
2626
import org.elasticsearch.cluster.routing.ShardRouting;
27+
import org.elasticsearch.cluster.routing.ShardRoutingState;
2728
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
2829
import org.elasticsearch.common.inject.Inject;
2930
import org.elasticsearch.common.settings.Settings;
3031
import org.elasticsearch.common.unit.ByteSizeValue;
3132
import org.elasticsearch.common.unit.RatioValue;
3233
import org.elasticsearch.node.settings.NodeSettingsService;
3334

35+
import java.util.List;
3436
import java.util.Map;
3537

3638
import static org.elasticsearch.cluster.InternalClusterInfoService.shardIdentifierFromRouting;
@@ -66,24 +68,32 @@ public class DiskThresholdDecider extends AllocationDecider {
6668
private volatile Double freeDiskThresholdHigh;
6769
private volatile ByteSizeValue freeBytesThresholdLow;
6870
private volatile ByteSizeValue freeBytesThresholdHigh;
71+
private volatile boolean includeRelocations;
6972
private volatile boolean enabled;
7073

7174
public static final String CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED = "cluster.routing.allocation.disk.threshold_enabled";
7275
public static final String CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK = "cluster.routing.allocation.disk.watermark.low";
7376
public static final String CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK = "cluster.routing.allocation.disk.watermark.high";
77+
public static final String CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS = "cluster.routing.allocation.disk.include_relocations";
7478

7579
class ApplySettings implements NodeSettingsService.Listener {
7680
@Override
7781
public void onRefreshSettings(Settings settings) {
7882
String newLowWatermark = settings.get(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, null);
7983
String newHighWatermark = settings.get(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, null);
84+
Boolean newRelocationsSetting = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, null);
8085
Boolean newEnableSetting = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, null);
8186

8287
if (newEnableSetting != null) {
8388
logger.info("updating [{}] from [{}] to [{}]", CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED,
8489
DiskThresholdDecider.this.enabled, newEnableSetting);
8590
DiskThresholdDecider.this.enabled = newEnableSetting;
8691
}
92+
if (newRelocationsSetting != null) {
93+
logger.info("updating [{}] from [{}] to [{}]", CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS,
94+
DiskThresholdDecider.this.includeRelocations, newRelocationsSetting);
95+
DiskThresholdDecider.this.includeRelocations = newRelocationsSetting;
96+
}
8797
if (newLowWatermark != null) {
8898
if (!validWatermarkSetting(newLowWatermark)) {
8999
throw new ElasticsearchParseException("Unable to parse low watermark: [" + newLowWatermark + "]");
@@ -125,11 +135,29 @@ public DiskThresholdDecider(Settings settings, NodeSettingsService nodeSettingsS
125135

126136
this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark);
127137
this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark);
138+
this.includeRelocations = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, true);
128139

129140
this.enabled = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true);
130141
nodeSettingsService.addListener(new ApplySettings());
131142
}
132143

144+
/**
145+
* Returns the size of all shards that are currently being relocated to
146+
* the node, but may not be finished transfering yet.
147+
*/
148+
public long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation, Map<String, Long> shardSizes) {
149+
List<ShardRouting> relocatingShards = allocation.routingTable().shardsWithState(ShardRoutingState.RELOCATING);
150+
long totalSize = 0;
151+
for (ShardRouting routing : relocatingShards) {
152+
if (routing.relocatingNodeId().equals(node.nodeId())) {
153+
Long shardSize = shardSizes.get(shardIdentifierFromRouting(routing));
154+
shardSize = shardSize == null ? 0 : shardSize;
155+
totalSize += shardSize;
156+
}
157+
}
158+
return totalSize;
159+
}
160+
133161
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
134162

135163
// Always allow allocation if the decider is disabled
@@ -175,6 +203,16 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
175203
}
176204
}
177205

206+
if (includeRelocations) {
207+
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes);
208+
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
209+
if (logger.isDebugEnabled()) {
210+
logger.debug("usage without relocations: {}", usage);
211+
logger.debug("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations);
212+
}
213+
usage = usageIncludingRelocations;
214+
}
215+
178216
// First, check that the node currently over the low watermark
179217
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
180218
long freeBytes = usage.getFreeBytes();
@@ -226,7 +264,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
226264
freeDiskThresholdLow, freeDiskPercentage, node.nodeId());
227265
}
228266
return allocation.decision(Decision.NO, NAME, "less than required [%s%%] free disk on node, free: [%s%%]",
229-
freeDiskThresholdLow, freeDiskThresholdLow);
267+
freeDiskThresholdLow, freeDiskPercentage);
230268
} else if (freeDiskPercentage > freeDiskThresholdHigh) {
231269
// Allow the shard to be allocated because it is primary that
232270
// has never been allocated if it's under the high watermark
@@ -245,7 +283,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
245283
freeDiskThresholdHigh, freeDiskPercentage, node.nodeId());
246284
}
247285
return allocation.decision(Decision.NO, NAME, "less than required [%s%%] free disk on node, free: [%s%%]",
248-
freeDiskThresholdLow, freeDiskThresholdLow);
286+
freeDiskThresholdLow, freeDiskPercentage);
249287
}
250288
}
251289

@@ -306,6 +344,17 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
306344
}
307345
}
308346

347+
if (includeRelocations) {
348+
Map<String, Long> shardSizes = clusterInfo.getShardSizes();
349+
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes);
350+
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
351+
if (logger.isDebugEnabled()) {
352+
logger.debug("usage without relocations: {}", usage);
353+
logger.debug("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations);
354+
}
355+
usage = usageIncludingRelocations;
356+
}
357+
309358
// If this node is already above the high threshold, the shard cannot remain (get it off!)
310359
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
311360
long freeBytes = usage.getFreeBytes();

src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public ClusterDynamicSettingsModule() {
8484
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK);
8585
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK);
8686
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED);
87+
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS);
8788
clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, Validator.TIME);
8889
clusterDynamicSettings.addDynamicSetting(SnapshotInProgressAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED);
8990
clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME);

src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.google.common.base.Predicate;
2323
import com.google.common.collect.ImmutableMap;
24+
import org.elasticsearch.ElasticsearchIllegalArgumentException;
2425
import org.elasticsearch.cluster.ClusterInfo;
2526
import org.elasticsearch.cluster.ClusterInfoService;
2627
import org.elasticsearch.cluster.ClusterState;
@@ -31,8 +32,12 @@
3132
import org.elasticsearch.cluster.routing.*;
3233
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3334
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
35+
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
36+
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
37+
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
3438
import org.elasticsearch.common.settings.ImmutableSettings;
3539
import org.elasticsearch.common.settings.Settings;
40+
import org.elasticsearch.index.shard.ShardId;
3641
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
3742
import org.elasticsearch.test.junit.annotations.TestLogging;
3843
import org.junit.Test;
@@ -649,6 +654,106 @@ public void freeDiskPercentageAfterShardAssignedUnitTest() {
649654
assertThat(after, equalTo(19.0));
650655
}
651656

657+
@Test
658+
public void testShardRelocationsTakenIntoAccount() {
659+
Settings diskSettings = settingsBuilder()
660+
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true)
661+
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, true)
662+
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, 0.7)
663+
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.8).build();
664+
665+
Map<String, DiskUsage> usages = new HashMap<>();
666+
usages.put("node1", new DiskUsage("node1", 100, 40)); // 60% used
667+
usages.put("node2", new DiskUsage("node2", 100, 40)); // 60% used
668+
usages.put("node2", new DiskUsage("node3", 100, 40)); // 60% used
669+
670+
Map<String, Long> shardSizes = new HashMap<>();
671+
shardSizes.put("[test][0][p]", 14L); // 14 bytes
672+
shardSizes.put("[test][0][r]", 14L);
673+
shardSizes.put("[test2][0][p]", 1L); // 1 bytes
674+
shardSizes.put("[test2][0][r]", 1L);
675+
final ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes));
676+
677+
AllocationDeciders deciders = new AllocationDeciders(ImmutableSettings.EMPTY,
678+
new HashSet<>(Arrays.asList(
679+
new SameShardAllocationDecider(ImmutableSettings.EMPTY),
680+
new DiskThresholdDecider(diskSettings))));
681+
682+
ClusterInfoService cis = new ClusterInfoService() {
683+
@Override
684+
public ClusterInfo getClusterInfo() {
685+
logger.info("--> calling fake getClusterInfo");
686+
return clusterInfo;
687+
}
688+
};
689+
690+
AllocationService strategy = new AllocationService(settingsBuilder()
691+
.put("cluster.routing.allocation.concurrent_recoveries", 10)
692+
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
693+
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
694+
.build(), deciders, new ShardsAllocators(), cis);
695+
696+
MetaData metaData = MetaData.builder()
697+
.put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(1))
698+
.put(IndexMetaData.builder("test2").numberOfShards(1).numberOfReplicas(1))
699+
.build();
700+
701+
RoutingTable routingTable = RoutingTable.builder()
702+
.addAsNew(metaData.index("test"))
703+
.addAsNew(metaData.index("test2"))
704+
.build();
705+
706+
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
707+
708+
logger.info("--> adding two nodes");
709+
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
710+
.put(newNode("node1"))
711+
.put(newNode("node2"))
712+
).build();
713+
routingTable = strategy.reroute(clusterState).routingTable();
714+
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
715+
logShardStates(clusterState);
716+
717+
// shards should be initializing
718+
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(4));
719+
720+
logger.info("--> start the shards");
721+
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
722+
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
723+
724+
logShardStates(clusterState);
725+
// Assert that we're able to start the primary and replicas
726+
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4));
727+
728+
logger.info("--> adding node3");
729+
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
730+
.put(newNode("node3"))
731+
).build();
732+
733+
AllocationCommand relocate1 = new MoveAllocationCommand(new ShardId("test", 0), "node2", "node3");
734+
AllocationCommands cmds = new AllocationCommands(relocate1);
735+
736+
routingTable = strategy.reroute(clusterState, cmds).routingTable();
737+
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
738+
logShardStates(clusterState);
739+
740+
AllocationCommand relocate2 = new MoveAllocationCommand(new ShardId("test2", 0), "node2", "node3");
741+
cmds = new AllocationCommands(relocate2);
742+
743+
try {
744+
// The shard for the "test" index is already being relocated to
745+
// node3, which will put it over the low watermark when it
746+
// completes, with shard relocations taken into account this should
747+
// throw an exception about not being able to complete
748+
strategy.reroute(clusterState, cmds).routingTable();
749+
fail("should not have been able to reroute the shard");
750+
} catch (ElasticsearchIllegalArgumentException e) {
751+
assertThat("can't allocated because there isn't enough room: " + e.getMessage(),
752+
e.getMessage().contains("less than required [30.0%] free disk on node, free: [26.0%]"), equalTo(true));
753+
}
754+
755+
}
756+
652757
public void logShardStates(ClusterState state) {
653758
RoutingNodes rn = state.routingNodes();
654759
logger.info("--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}",

0 commit comments

Comments
 (0)