Skip to content

Commit a8bcbbc

Browse files
committed
Quieter logging from the DiskThresholdMonitor (#48115)
Today if an Elasticsearch node reaches a disk watermark then it will repeatedly emit logging about it, which implies that some action needs to be taken by the administrator. This is misleading. Elasticsearch strives to keep nodes under the high watermark, but it is normal to have a few nodes occasionally exceed this level. Nodes may be over the low watermark for an extended period without any ill effects. This commit enhances the logging emitted by the `DiskThresholdMonitor` to be less misleading. The expected case of hitting the high watermark and immediately relocating one or more shards that to bring the node back under the watermark again is reduced in severity to `INFO`. Additionally, `INFO` messages are not emitted repeatedly. Fixes #48038
1 parent 5e4dd0f commit a8bcbbc

File tree

10 files changed

+454
-111
lines changed

10 files changed

+454
-111
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class BatchedRerouteService implements RerouteService {
5050

5151
private final Object mutex = new Object();
5252
@Nullable // null if no reroute is currently pending
53-
private List<ActionListener<Void>> pendingRerouteListeners;
53+
private List<ActionListener<ClusterState>> pendingRerouteListeners;
5454
private Priority pendingTaskPriority = Priority.LANGUID;
5555

5656
/**
@@ -65,8 +65,8 @@ public BatchedRerouteService(ClusterService clusterService, BiFunction<ClusterSt
6565
* Initiates a reroute.
6666
*/
6767
@Override
68-
public final void reroute(String reason, Priority priority, ActionListener<Void> listener) {
69-
final List<ActionListener<Void>> currentListeners;
68+
public final void reroute(String reason, Priority priority, ActionListener<ClusterState> listener) {
69+
final List<ActionListener<ClusterState>> currentListeners;
7070
synchronized (mutex) {
7171
if (pendingRerouteListeners != null) {
7272
if (priority.sameOrAfter(pendingTaskPriority)) {
@@ -148,7 +148,7 @@ public void onFailure(String source, Exception e) {
148148

149149
@Override
150150
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
151-
ActionListener.onResponse(currentListeners, null);
151+
ActionListener.onResponse(currentListeners, newState);
152152
}
153153
});
154154
} catch (Exception e) {

server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.elasticsearch.cluster.routing;
2020

2121
import org.elasticsearch.action.ActionListener;
22+
import org.elasticsearch.cluster.ClusterState;
2223
import org.elasticsearch.common.Priority;
2324

2425
/**
@@ -33,5 +34,5 @@ public interface RerouteService {
3334
* this reroute is batched with the pending one; if there is already a pending reroute at a lower priority then
3435
* the priority of the pending batch is raised to the given priority.
3536
*/
36-
void reroute(String reason, Priority priority, ActionListener<Void> listener);
37+
void reroute(String reason, Priority priority, ActionListener<ClusterState> listener);
3738
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java

+128-55
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java

+19
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.cluster.routing.allocation;
2121

2222
import org.elasticsearch.ElasticsearchParseException;
23+
import org.elasticsearch.common.Strings;
2324
import org.elasticsearch.common.settings.ClusterSettings;
2425
import org.elasticsearch.common.settings.Setting;
2526
import org.elasticsearch.common.settings.Settings;
@@ -321,6 +322,24 @@ public TimeValue getRerouteInterval() {
321322
return rerouteInterval;
322323
}
323324

325+
String describeLowThreshold() {
326+
return freeBytesThresholdLow.equals(ByteSizeValue.ZERO)
327+
? Strings.format1Decimals(100.0 - freeDiskThresholdLow, "%")
328+
: freeBytesThresholdLow.toString();
329+
}
330+
331+
String describeHighThreshold() {
332+
return freeBytesThresholdHigh.equals(ByteSizeValue.ZERO)
333+
? Strings.format1Decimals(100.0 - freeDiskThresholdHigh, "%")
334+
: freeBytesThresholdHigh.toString();
335+
}
336+
337+
String describeFloodStageThreshold() {
338+
return freeBytesThresholdFloodStage.equals(ByteSizeValue.ZERO)
339+
? Strings.format1Decimals(100.0 - freeDiskThresholdFloodStage, "%")
340+
: freeBytesThresholdFloodStage.toString();
341+
}
342+
324343
/**
325344
* Attempts to parse the watermark into a percentage, returning 100.0% if
326345
* it cannot be parsed.

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -817,8 +817,9 @@ private void allocateUnassigned() {
817817
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
818818
}
819819

820-
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation,
821-
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
820+
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard,
821+
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE,
822+
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
822823
shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes());
823824
minNode.addShard(shard);
824825
if (!shard.primary()) {
@@ -838,8 +839,9 @@ private void allocateUnassigned() {
838839
if (minNode != null) {
839840
// throttle decision scenario
840841
assert allocationDecision.getAllocationStatus() == AllocationStatus.DECIDERS_THROTTLED;
841-
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation,
842-
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
842+
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard,
843+
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE,
844+
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
843845
minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize));
844846
final RoutingNode node = minNode.getRoutingNode();
845847
final Decision.Type nodeLevelDecision = deciders.canAllocate(node, allocation).type();

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java

+21-19
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@
2727
import org.elasticsearch.cluster.ClusterInfo;
2828
import org.elasticsearch.cluster.DiskUsage;
2929
import org.elasticsearch.cluster.metadata.IndexMetaData;
30+
import org.elasticsearch.cluster.metadata.MetaData;
3031
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
3132
import org.elasticsearch.cluster.routing.RecoverySource;
3233
import org.elasticsearch.cluster.routing.RoutingNode;
34+
import org.elasticsearch.cluster.routing.RoutingTable;
3335
import org.elasticsearch.cluster.routing.ShardRouting;
3436
import org.elasticsearch.cluster.routing.ShardRoutingState;
3537
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
@@ -86,10 +88,9 @@ public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings)
8688
*
8789
* If subtractShardsMovingAway is true then the size of shards moving away is subtracted from the total size of all shards
8890
*/
89-
static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation,
90-
boolean subtractShardsMovingAway, String dataPath) {
91-
ClusterInfo clusterInfo = allocation.clusterInfo();
92-
long totalSize = 0;
91+
public static long sizeOfRelocatingShards(RoutingNode node, boolean subtractShardsMovingAway, String dataPath, ClusterInfo clusterInfo,
92+
MetaData metaData, RoutingTable routingTable) {
93+
long totalSize = 0L;
9394

9495
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) {
9596
if (routing.relocatingNodeId() == null) {
@@ -103,7 +104,7 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio
103104
// if we don't yet know the actual path of the incoming shard then conservatively assume it's going to the path with the least
104105
// free space
105106
if (actualPath == null || actualPath.equals(dataPath)) {
106-
totalSize += getExpectedShardSize(routing, allocation, 0);
107+
totalSize += getExpectedShardSize(routing, 0L, clusterInfo, metaData, routingTable);
107108
}
108109
}
109110

@@ -115,7 +116,7 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio
115116
actualPath = clusterInfo.getDataPath(routing.cancelRelocation());
116117
}
117118
if (dataPath.equals(actualPath)) {
118-
totalSize -= getExpectedShardSize(routing, allocation, 0);
119+
totalSize -= getExpectedShardSize(routing, 0L, clusterInfo, metaData, routingTable);
119120
}
120121
}
121122
}
@@ -239,7 +240,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
239240
}
240241

241242
// Secondly, check that allocating the shard to this node doesn't put it above the high watermark
242-
final long shardSize = getExpectedShardSize(shardRouting, allocation, 0);
243+
final long shardSize = getExpectedShardSize(shardRouting, 0L,
244+
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
243245
double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize);
244246
long freeBytesAfterShard = freeBytes - shardSize;
245247
if (freeBytesAfterShard < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
@@ -342,7 +344,8 @@ private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation,
342344
}
343345

344346
if (diskThresholdSettings.includeRelocations()) {
345-
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, subtractLeavingShards, usage.getPath());
347+
final long relocatingShardsSize = sizeOfRelocatingShards(node, subtractLeavingShards, usage.getPath(),
348+
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
346349
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(),
347350
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
348351
if (logger.isTraceEnabled()) {
@@ -425,29 +428,28 @@ private Decision earlyTerminate(RoutingAllocation allocation, ImmutableOpenMap<S
425428
* Returns the expected shard size for the given shard or the default value provided if not enough information are available
426429
* to estimate the shards size.
427430
*/
428-
public static long getExpectedShardSize(ShardRouting shard, RoutingAllocation allocation, long defaultValue) {
429-
final IndexMetaData metaData = allocation.metaData().getIndexSafe(shard.index());
430-
final ClusterInfo info = allocation.clusterInfo();
431-
if (metaData.getResizeSourceIndex() != null && shard.active() == false &&
431+
public static long getExpectedShardSize(ShardRouting shard, long defaultValue, ClusterInfo clusterInfo, MetaData metaData,
432+
RoutingTable routingTable) {
433+
final IndexMetaData indexMetaData = metaData.getIndexSafe(shard.index());
434+
if (indexMetaData.getResizeSourceIndex() != null && shard.active() == false &&
432435
shard.recoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
433436
// in the shrink index case we sum up the source index shards since we basically make a copy of the shard in
434437
// the worst case
435438
long targetShardSize = 0;
436-
final Index mergeSourceIndex = metaData.getResizeSourceIndex();
437-
final IndexMetaData sourceIndexMeta = allocation.metaData().index(mergeSourceIndex);
439+
final Index mergeSourceIndex = indexMetaData.getResizeSourceIndex();
440+
final IndexMetaData sourceIndexMeta = metaData.index(mergeSourceIndex);
438441
if (sourceIndexMeta != null) {
439442
final Set<ShardId> shardIds = IndexMetaData.selectRecoverFromShards(shard.id(),
440-
sourceIndexMeta, metaData.getNumberOfShards());
441-
for (IndexShardRoutingTable shardRoutingTable : allocation.routingTable().index(mergeSourceIndex.getName())) {
443+
sourceIndexMeta, indexMetaData.getNumberOfShards());
444+
for (IndexShardRoutingTable shardRoutingTable : routingTable.index(mergeSourceIndex.getName())) {
442445
if (shardIds.contains(shardRoutingTable.shardId())) {
443-
targetShardSize += info.getShardSize(shardRoutingTable.primaryShard(), 0);
446+
targetShardSize += clusterInfo.getShardSize(shardRoutingTable.primaryShard(), 0);
444447
}
445448
}
446449
}
447450
return targetShardSize == 0 ? defaultValue : targetShardSize;
448451
} else {
449-
return info.getShardSize(shard, defaultValue);
452+
return clusterInfo.getShardSize(shard, defaultValue);
450453
}
451-
452454
}
453455
}

server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC
5555

5656
private ShardStateAction.ShardStartedClusterStateTaskExecutor executor;
5757

58-
private static void neverReroutes(String reason, Priority priority, ActionListener<Void> listener) {
58+
@SuppressWarnings("unused")
59+
private static void neverReroutes(String reason, Priority priority, ActionListener<ClusterState> listener) {
5960
fail("unexpectedly ran a deferred reroute");
6061
}
6162

0 commit comments

Comments
 (0)