From cc5a2a468f889c26ddb1e5a9da962906bec6c7ac Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 16 Oct 2019 10:10:30 +0100 Subject: [PATCH 1/5] Quieter logging from the DiskThresholdMonitor Today if an Elasticsearch node reaches a disk watermark then it will repeatedly emit logging about it, which implies that some action needs to be taken by the administrator. This is misleading. Elasticsearch strives to keep nodes under the high watermark, but it is normal to have a few nodes occasionally exceed this level. Nodes may be over the low watermark for an extended period without any ill effects. This commit enhances the logging emitted by the `DiskThresholdMonitor` to be less misleading. The expected case of hitting the high watermark and immediately relocating one or more shards that to bring the node back under the watermark again is reduced in severity to `INFO`. Additionally, `INFO` messages are not emitted repeatedly. Fixes #48038 --- .../routing/BatchedRerouteService.java | 8 +- .../cluster/routing/RerouteService.java | 3 +- .../allocation/DiskThresholdMonitor.java | 183 ++++++++++++------ .../allocation/DiskThresholdSettings.java | 19 ++ .../allocator/BalancedShardsAllocator.java | 10 +- .../decider/DiskThresholdDecider.java | 40 ++-- ...dStartedClusterStateTaskExecutorTests.java | 3 +- .../allocation/DiskThresholdMonitorTests.java | 168 +++++++++++++++- .../DiskThresholdSettingsTests.java | 29 +++ .../DiskThresholdDeciderUnitTests.java | 53 +++-- 10 files changed, 405 insertions(+), 111 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java b/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java index 0e387db5f45ef..84ee572770afb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java @@ -50,7 +50,7 @@ public class BatchedRerouteService implements RerouteService { private final Object mutex = new Object(); @Nullable // null if no reroute is currently pending - private List> pendingRerouteListeners; + private List> pendingRerouteListeners; private Priority pendingTaskPriority = Priority.LANGUID; /** @@ -65,8 +65,8 @@ public BatchedRerouteService(ClusterService clusterService, BiFunction listener) { - final List> currentListeners; + public final void reroute(String reason, Priority priority, ActionListener listener) { + final List> currentListeners; synchronized (mutex) { if (pendingRerouteListeners != null) { if (priority.sameOrAfter(pendingTaskPriority)) { @@ -148,7 +148,7 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - ActionListener.onResponse(currentListeners, null); + ActionListener.onResponse(currentListeners, newState); } }); } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java b/server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java index 58f9e41fe88a7..38d27117b0b10 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.Priority; /** @@ -33,5 +34,5 @@ public interface RerouteService { * this reroute is batched with the pending one; if there is already a pending reroute at a lower priority then * the priority of the pending batch is raised to the given priority. */ - void reroute(String reason, Priority priority, ActionListener listener); + void reroute(String reason, Priority priority, ActionListener listener); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index fcd907a60fd0c..00d2c0ecb4fa7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -43,7 +44,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -63,13 +66,30 @@ public class DiskThresholdMonitor { private final DiskThresholdSettings diskThresholdSettings; private final Client client; - private final Set nodeHasPassedWatermark = Sets.newConcurrentHashSet(); private final Supplier clusterStateSupplier; private final LongSupplier currentTimeMillisSupplier; private final RerouteService rerouteService; private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE); private final AtomicBoolean checkInProgress = new AtomicBoolean(); + /** + * The IDs of the nodes that were over the low threshold in the last check (and maybe over another threshold too). Tracked so that we + * can log when such nodes are no longer over the low threshold. + */ + private final Set nodesOverLowThreshold = Sets.newConcurrentHashSet(); + + /** + * The IDs of the nodes that were over the high threshold in the last check (and maybe over another threshold too). Tracked so that we + * can log when such nodes are no longer over the high threshold. + */ + private final Set nodesOverHighThreshold = Sets.newConcurrentHashSet(); + + /** + * The IDs of the nodes that were over the high threshold in the last check, but which are relocating shards that will bring them + * under the high threshold again. Tracked so that we can log when such nodes are no longer in this state. + */ + private final Set nodesOverHighThresholdAndRelocating = Sets.newConcurrentHashSet(); + public DiskThresholdMonitor(Settings settings, Supplier clusterStateSupplier, ClusterSettings clusterSettings, Client client, LongSupplier currentTimeMillisSupplier, RerouteService rerouteService) { this.clusterStateSupplier = clusterStateSupplier; @@ -79,35 +99,6 @@ public DiskThresholdMonitor(Settings settings, Supplier clusterSta this.client = client; } - /** - * Warn about the given disk usage if the low or high watermark has been passed - */ - private void warnAboutDiskIfNeeded(DiskUsage usage) { - // Check absolute disk values - if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes()) { - logger.warn("flood stage disk watermark [{}] exceeded on {}, all indices on this node will be marked read-only", - diskThresholdSettings.getFreeBytesThresholdFloodStage(), usage); - } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) { - logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node", - diskThresholdSettings.getFreeBytesThresholdHigh(), usage); - } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes()) { - logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node", - diskThresholdSettings.getFreeBytesThresholdLow(), usage); - } - - // Check percentage disk values - if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) { - logger.warn("flood stage disk watermark [{}] exceeded on {}, all indices on this node will be marked read-only", - Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdFloodStage(), "%"), usage); - } else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { - logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node", - Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdHigh(), "%"), usage); - } else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) { - logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node", - Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdLow(), "%"), usage); - } - } - private void checkFinished() { final boolean checkFinished = checkInProgress.compareAndSet(true, false); assert checkFinished; @@ -130,38 +121,50 @@ public void onNewInfo(ClusterInfo info) { String explanation = ""; final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); - // Garbage collect nodes that have been removed from the cluster - // from the map that tracks watermark crossing + // Clean up nodes that have been removed from the cluster final ObjectLookupContainer nodes = usages.keys(); - for (String node : nodeHasPassedWatermark) { - if (nodes.contains(node) == false) { - nodeHasPassedWatermark.remove(node); - } - } + cleanUpRemovedNodes(nodes, nodesOverLowThreshold); + cleanUpRemovedNodes(nodes, nodesOverHighThreshold); + cleanUpRemovedNodes(nodes, nodesOverHighThresholdAndRelocating); + final ClusterState state = clusterStateSupplier.get(); final Set indicesToMarkReadOnly = new HashSet<>(); RoutingNodes routingNodes = state.getRoutingNodes(); Set indicesNotToAutoRelease = new HashSet<>(); markNodesMissingUsageIneligibleForRelease(routingNodes, usages, indicesNotToAutoRelease); + final List usagesOverHighThreshold = new ArrayList<>(); + for (final ObjectObjectCursor entry : usages) { final String node = entry.key; final DiskUsage usage = entry.value; - warnAboutDiskIfNeeded(usage); - RoutingNode routingNode = routingNodes.node(node); - // Only unblock index if all nodes that contain shards of it are below the high disk watermark + final RoutingNode routingNode = routingNodes.node(node); + if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() || usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) { - if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?! + + nodesOverLowThreshold.add(node); + nodesOverHighThreshold.add(node); + nodesOverHighThresholdAndRelocating.remove(node); + + if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step for (ShardRouting routing : routingNode) { String indexName = routing.index().getName(); indicesToMarkReadOnly.add(indexName); indicesNotToAutoRelease.add(indexName); } } + + logger.warn("flood stage disk watermark [{}] exceeded on {}, all indices on this node will be marked read-only", + diskThresholdSettings.describeFloodStageThreshold(), usage); + } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() || usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { - if (routingNode != null) { + + nodesOverLowThreshold.add(node); + nodesOverHighThreshold.add(node); + + if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step for (ShardRouting routing : routingNode) { String indexName = routing.index().getName(); indicesNotToAutoRelease.add(indexName); @@ -170,41 +173,98 @@ public void onNewInfo(ClusterInfo info) { if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) { reroute = true; explanation = "high disk watermark exceeded on one or more nodes"; + usagesOverHighThreshold.add(usage); + // will log about this node when the reroute completes } else { logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " + "in the last [{}], skipping reroute", node, diskThresholdSettings.getRerouteInterval()); } - nodeHasPassedWatermark.add(node); + } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() || usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) { - nodeHasPassedWatermark.add(node); + + nodesOverHighThresholdAndRelocating.remove(node); + + final boolean wasUnderLowThreshold = nodesOverLowThreshold.add(node); + final boolean wasOverHighThreshold = nodesOverHighThreshold.remove(node); + assert (wasUnderLowThreshold && wasOverHighThreshold) == false; + + if (wasUnderLowThreshold) { + logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node", + diskThresholdSettings.describeLowThreshold(), usage); + } else if (wasOverHighThreshold) { + logger.info("high disk watermark [{}] no longer exceeded on {}, but low disk watermark [{}] is still exceeded", + diskThresholdSettings.describeHighThreshold(), usage, diskThresholdSettings.describeLowThreshold()); + } + } else { - if (nodeHasPassedWatermark.contains(node)) { - // The node has previously been over the high or - // low watermark, but is no longer, so we should - // reroute so any unassigned shards can be allocated - // if they are able to be + + nodesOverHighThresholdAndRelocating.remove(node); + + if (nodesOverLowThreshold.contains(node)) { + // The node has previously been over the low watermark, but is no longer, so it may be possible to allocate more shards + // if we reroute now. if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) { reroute = true; explanation = "one or more nodes has gone under the high or low watermark"; - nodeHasPassedWatermark.remove(node); + nodesOverLowThreshold.remove(node); + nodesOverHighThreshold.remove(node); + + logger.info("low disk watermark [{}] no longer exceeded on {}", + diskThresholdSettings.describeLowThreshold(), usage); + } else { logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " + "in the last [{}], skipping reroute", node, diskThresholdSettings.getRerouteInterval()); } } + } } final ActionListener listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 3); if (reroute) { - logger.info("rerouting shards: [{}]", explanation); - rerouteService.reroute("disk threshold monitor", Priority.HIGH, ActionListener.wrap(r -> { + logger.debug("rerouting shards: [{}]", explanation); + rerouteService.reroute("disk threshold monitor", Priority.HIGH, ActionListener.wrap(reroutedClusterState -> { + + for (DiskUsage diskUsage : usagesOverHighThreshold) { + final RoutingNode routingNode = reroutedClusterState.getRoutingNodes().node(diskUsage.getNodeId()); + final DiskUsage usageIncludingRelocations; + final long relocatingShardsSize; + if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step + relocatingShardsSize = sizeOfRelocatingShards(info, reroutedClusterState, diskUsage, routingNode); + usageIncludingRelocations = new DiskUsage(diskUsage.getNodeId(), diskUsage.getNodeName(), + diskUsage.getPath(), diskUsage.getTotalBytes(), diskUsage.getFreeBytes() - relocatingShardsSize); + } else { + usageIncludingRelocations = diskUsage; + relocatingShardsSize = 0L; + } + + if (usageIncludingRelocations.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() + || usageIncludingRelocations.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { + + nodesOverHighThresholdAndRelocating.remove(diskUsage.getNodeId()); + logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node; " + + "currently relocating away shards totalling [{}] bytes; the node is expected to continue to exceed " + + "the high disk watermark when these relocations are complete", + diskThresholdSettings.describeHighThreshold(), diskUsage, -relocatingShardsSize); + } else if (nodesOverHighThresholdAndRelocating.add(diskUsage.getNodeId())) { + logger.info("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node; " + + "currently relocating away shards totalling [{}] bytes; the node is expected to be below the high " + + "disk watermark when these relocations are complete", + diskThresholdSettings.describeHighThreshold(), diskUsage, -relocatingShardsSize); + } else { + logger.debug("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node; " + + "currently relocating away shards totalling [{}] bytes", + diskThresholdSettings.describeHighThreshold(), diskUsage, -relocatingShardsSize); + } + } + setLastRunTimeMillis(); - listener.onResponse(r); + listener.onResponse(null); }, e -> { logger.debug("reroute failed", e); setLastRunTimeMillis(); @@ -213,7 +273,7 @@ public void onNewInfo(ClusterInfo info) { } else { listener.onResponse(null); } - Set indicesToAutoRelease = StreamSupport.stream(state.routingTable().indicesRouting() + final Set indicesToAutoRelease = StreamSupport.stream(state.routingTable().indicesRouting() .spliterator(), false) .map(c -> c.key) .filter(index -> indicesNotToAutoRelease.contains(index) == false) @@ -236,6 +296,12 @@ public void onNewInfo(ClusterInfo info) { } } + // exposed for tests to override + long sizeOfRelocatingShards(ClusterInfo info, ClusterState reroutedClusterState, DiskUsage diskUsage, RoutingNode routingNode) { + return DiskThresholdDecider.sizeOfRelocatingShards(routingNode, true, + diskUsage.getPath(), info, reroutedClusterState.metaData(), reroutedClusterState.routingTable()); + } + private void markNodesMissingUsageIneligibleForRelease(RoutingNodes routingNodes, ImmutableOpenMap usages, Set indicesToMarkIneligibleForAutoRelease) { for (RoutingNode routingNode : routingNodes) { @@ -246,7 +312,6 @@ private void markNodesMissingUsageIneligibleForRelease(RoutingNodes routingNodes } } } - } private void setLastRunTimeMillis() { @@ -270,4 +335,12 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener .setSettings(readOnlySettings) .execute(ActionListener.map(wrappedListener, r -> null)); } + + private static void cleanUpRemovedNodes(ObjectLookupContainer nodes, Set nodesOverLowThreshold) { + for (String node : nodesOverLowThreshold) { + if (nodes.contains(node) == false) { + nodesOverLowThreshold.remove(node); + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java index 72e13b28a9b49..bf6bfb5ef328c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java @@ -21,6 +21,7 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.Version; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -298,6 +299,24 @@ public TimeValue getRerouteInterval() { return rerouteInterval; } + String describeLowThreshold() { + return freeBytesThresholdLow.equals(ByteSizeValue.ZERO) + ? Strings.format1Decimals(100.0 - freeDiskThresholdLow, "%") + : freeBytesThresholdLow.toString(); + } + + String describeHighThreshold() { + return freeBytesThresholdHigh.equals(ByteSizeValue.ZERO) + ? Strings.format1Decimals(100.0 - freeDiskThresholdHigh, "%") + : freeBytesThresholdHigh.toString(); + } + + String describeFloodStageThreshold() { + return freeBytesThresholdFloodStage.equals(ByteSizeValue.ZERO) + ? Strings.format1Decimals(100.0 - freeDiskThresholdFloodStage, "%") + : freeBytesThresholdFloodStage.toString(); + } + /** * Attempts to parse the watermark into a percentage, returning 100.0% if * it cannot be parsed. diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index a1549c5e217a4..ba92b7a20455c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -817,8 +817,9 @@ private void allocateUnassigned() { logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId()); } - final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation, - ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, + ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, + allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes()); minNode.addShard(shard); if (!shard.primary()) { @@ -838,8 +839,9 @@ private void allocateUnassigned() { if (minNode != null) { // throttle decision scenario assert allocationDecision.getAllocationStatus() == AllocationStatus.DECIDERS_THROTTLED; - final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation, - ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, + ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, + allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize)); final RoutingNode node = minNode.getRoutingNode(); final Decision.Type nodeLevelDecision = deciders.canAllocate(node, allocation).type(); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 0bb8fdb186d03..a04823b4f1188 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -27,9 +27,11 @@ import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; @@ -86,10 +88,9 @@ public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings) * * If subtractShardsMovingAway is true then the size of shards moving away is subtracted from the total size of all shards */ - static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation, - boolean subtractShardsMovingAway, String dataPath) { - ClusterInfo clusterInfo = allocation.clusterInfo(); - long totalSize = 0; + public static long sizeOfRelocatingShards(RoutingNode node, boolean subtractShardsMovingAway, String dataPath, ClusterInfo clusterInfo, + MetaData metaData, RoutingTable routingTable) { + long totalSize = 0L; for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) { if (routing.relocatingNodeId() == null) { @@ -103,7 +104,7 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio // if we don't yet know the actual path of the incoming shard then conservatively assume it's going to the path with the least // free space if (actualPath == null || actualPath.equals(dataPath)) { - totalSize += getExpectedShardSize(routing, allocation, 0); + totalSize += getExpectedShardSize(routing, 0L, clusterInfo, metaData, routingTable); } } @@ -115,7 +116,7 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio actualPath = clusterInfo.getDataPath(routing.cancelRelocation()); } if (dataPath.equals(actualPath)) { - totalSize -= getExpectedShardSize(routing, allocation, 0); + totalSize -= getExpectedShardSize(routing, 0L, clusterInfo, metaData, routingTable); } } } @@ -239,7 +240,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing } // Secondly, check that allocating the shard to this node doesn't put it above the high watermark - final long shardSize = getExpectedShardSize(shardRouting, allocation, 0); + final long shardSize = getExpectedShardSize(shardRouting, 0L, + allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize); long freeBytesAfterShard = freeBytes - shardSize; if (freeBytesAfterShard < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) { @@ -339,7 +341,8 @@ private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeDiskAsPercentage()); } - final long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, subtractLeavingShards, usage.getPath()); + final long relocatingShardsSize = sizeOfRelocatingShards(node, subtractLeavingShards, usage.getPath(), + allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); final DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize); logger.trace("getDiskUsage: usage [{}] with [{}] bytes relocating yields [{}]", @@ -418,29 +421,28 @@ private Decision earlyTerminate(RoutingAllocation allocation, ImmutableOpenMap shardIds = IndexMetaData.selectRecoverFromShards(shard.id(), - sourceIndexMeta, metaData.getNumberOfShards()); - for (IndexShardRoutingTable shardRoutingTable : allocation.routingTable().index(mergeSourceIndex.getName())) { + sourceIndexMeta, indexMetaData.getNumberOfShards()); + for (IndexShardRoutingTable shardRoutingTable : routingTable.index(mergeSourceIndex.getName())) { if (shardIds.contains(shardRoutingTable.shardId())) { - targetShardSize += info.getShardSize(shardRoutingTable.primaryShard(), 0); + targetShardSize += clusterInfo.getShardSize(shardRoutingTable.primaryShard(), 0); } } } return targetShardSize == 0 ? defaultValue : targetShardSize; } else { - return info.getShardSize(shard, defaultValue); + return clusterInfo.getShardSize(shard, defaultValue); } - } } diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java index 51ee06b0f3e0e..d4424168f49ee 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java @@ -55,7 +55,8 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC private ShardStateAction.ShardStartedClusterStateTaskExecutor executor; - private static void neverReroutes(String reason, Priority priority, ActionListener listener) { + @SuppressWarnings("unused") + private static void neverReroutes(String reason, Priority priority, ActionListener listener) { fail("unexpectedly ran a deferred reroute"); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index 64406888bc239..e4f8ab52617e4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -18,6 +18,9 @@ */ package org.elasticsearch.cluster.routing.allocation; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterInfo; @@ -30,12 +33,16 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; import java.util.Arrays; import java.util.Collections; @@ -44,6 +51,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongSupplier; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; @@ -146,7 +154,7 @@ public void testDoesNotSubmitRerouteTaskTooFrequently() { final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(); AtomicLong currentTime = new AtomicLong(); - AtomicReference> listenerReference = new AtomicReference<>(); + AtomicReference> listenerReference = new AtomicReference<>(); DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, priority, listener) -> { @@ -180,7 +188,7 @@ protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionLi currentTime.addAndGet(randomLongBetween(0, 120000)); monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null)); assertNotNull(listenerReference.get()); - listenerReference.getAndSet(null).onResponse(null); + listenerReference.getAndSet(null).onResponse(clusterState); if (randomBoolean()) { // should not re-route again within the reroute interval @@ -195,7 +203,7 @@ protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionLi DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000)); monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null)); assertNotNull(listenerReference.get()); - final ActionListener rerouteListener1 = listenerReference.getAndSet(null); + final ActionListener rerouteListener1 = listenerReference.getAndSet(null); // should not re-route again before reroute has completed currentTime.addAndGet(randomLongBetween(0, 120000)); @@ -203,7 +211,7 @@ protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionLi assertNull(listenerReference.get()); // complete reroute - rerouteListener1.onResponse(null); + rerouteListener1.onResponse(clusterState); if (randomBoolean()) { // should not re-route again within the reroute interval @@ -250,7 +258,7 @@ public void testAutoReleaseIndices() { (reason, priority, listener) -> { assertNotNull(listener); assertThat(priority, equalTo(Priority.HIGH)); - listener.onResponse(null); + listener.onResponse(clusterState); }) { @Override protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener listener, boolean readOnly) { @@ -287,7 +295,7 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener (reason, priority, listener) -> { assertNotNull(listener); assertThat(priority, equalTo(Priority.HIGH)); - listener.onResponse(null); + listener.onResponse(clusterStateWithBlocks); }) { @Override protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener listener, boolean readOnly) { @@ -365,4 +373,152 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener assertThat(indicesToMarkReadOnly.get(), contains("test_1")); assertNull(indicesToRelease.get()); } + + @TestLogging(value="org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor:INFO", reason="testing INFO/WARN logging") + public void testDiskMonitorLogging() throws IllegalAccessException { + final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(DiscoveryNodes.builder().add(newNode("node1"))).build(); + final AtomicReference clusterStateRef = new AtomicReference<>(clusterState); + + final LongSupplier timeSupplier = new LongSupplier() { + long time; + + @Override + public long getAsLong() { + // advance time every check + time += DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).getMillis() + 1; + return time; + } + }; + + final AtomicLong relocatingShardSizeRef = new AtomicLong(); + + DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, clusterStateRef::get, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, timeSupplier, + (reason, priority, listener) -> listener.onResponse(clusterStateRef.get())) { + @Override + protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener, boolean readOnly) { + listener.onResponse(null); + } + + @Override + long sizeOfRelocatingShards(ClusterInfo info, ClusterState reroutedClusterState, DiskUsage diskUsage, RoutingNode routingNode) { + return relocatingShardSizeRef.get(); + } + }; + + final ImmutableOpenMap.Builder allDisksOkBuilder; + allDisksOkBuilder = ImmutableOpenMap.builder(); + allDisksOkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(15, 100))); + final ImmutableOpenMap allDisksOk = allDisksOkBuilder.build(); + + final ImmutableOpenMap.Builder aboveLowWatermarkBuilder = ImmutableOpenMap.builder(); + aboveLowWatermarkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 14))); + final ImmutableOpenMap aboveLowWatermark = aboveLowWatermarkBuilder.build(); + + final ImmutableOpenMap.Builder aboveHighWatermarkBuilder = ImmutableOpenMap.builder(); + aboveHighWatermarkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 9))); + final ImmutableOpenMap aboveHighWatermark = aboveHighWatermarkBuilder.build(); + + final ImmutableOpenMap.Builder aboveFloodStageWatermarkBuilder = ImmutableOpenMap.builder(); + aboveFloodStageWatermarkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4))); + final ImmutableOpenMap aboveFloodStageWatermark = aboveFloodStageWatermarkBuilder.build(); + + assertNoLogging(monitor, allDisksOk); + + assertSingleInfoMessage(monitor, aboveLowWatermark, + "low disk watermark [85%] exceeded on * replicas will not be assigned to this node"); + + assertRepeatedWarningMessages(monitor, aboveHighWatermark, + "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + + "the node is expected to continue to exceed the high disk watermark when these relocations are complete"); + + assertRepeatedWarningMessages(monitor, aboveFloodStageWatermark, + "flood stage disk watermark [95%] exceeded on * all indices on this node will be marked read-only"); + + relocatingShardSizeRef.set(-5L); + assertSingleInfoMessage(monitor, aboveHighWatermark, + "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + + "the node is expected to be below the high disk watermark when these relocations are complete"); + + relocatingShardSizeRef.set(0L); + assertRepeatedWarningMessages(monitor, aboveHighWatermark, + "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + + "the node is expected to continue to exceed the high disk watermark when these relocations are complete"); + + assertSingleInfoMessage(monitor, aboveLowWatermark, + "high disk watermark [90%] no longer exceeded on * but low disk watermark [85%] is still exceeded"); + + assertSingleInfoMessage(monitor, allDisksOk, + "low disk watermark [85%] no longer exceeded on *"); + } + + private void assertNoLogging(DiskThresholdMonitor monitor, + ImmutableOpenMap diskUsages) throws IllegalAccessException { + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation( + "any INFO message", + DiskThresholdMonitor.class.getCanonicalName(), + Level.INFO, + "*")); + mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation( + "any WARN message", + DiskThresholdMonitor.class.getCanonicalName(), + Level.WARN, + "*")); + + Logger diskThresholdMonitorLogger = LogManager.getLogger(DiskThresholdMonitor.class); + Loggers.addAppender(diskThresholdMonitorLogger, mockAppender); + + for (int i = between(1, 3); i >= 0; i--) { + monitor.onNewInfo(new ClusterInfo(diskUsages, null, null, null)); + } + + mockAppender.assertAllExpectationsMatched(); + Loggers.removeAppender(diskThresholdMonitorLogger, mockAppender); + mockAppender.stop(); + } + + private void assertRepeatedWarningMessages(DiskThresholdMonitor monitor, + ImmutableOpenMap diskUsages, + String message) throws IllegalAccessException { + for (int i = between(1, 3); i >= 0; i--) { + assertLogging(monitor, diskUsages, Level.WARN, message); + } + } + + private void assertSingleInfoMessage(DiskThresholdMonitor monitor, + ImmutableOpenMap diskUsages, + String message) throws IllegalAccessException { + assertLogging(monitor, diskUsages, Level.INFO, message); + assertNoLogging(monitor, diskUsages); + } + + private void assertLogging(DiskThresholdMonitor monitor, + ImmutableOpenMap diskUsages, + Level level, + String message) throws IllegalAccessException { + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation( + "expected message", + DiskThresholdMonitor.class.getCanonicalName(), + level, + message)); + mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation( + "any message of another level", + DiskThresholdMonitor.class.getCanonicalName(), + level == Level.INFO ? Level.WARN : Level.INFO, + "*")); + + Logger diskThresholdMonitorLogger = LogManager.getLogger(DiskThresholdMonitor.class); + Loggers.addAppender(diskThresholdMonitorLogger, mockAppender); + + monitor.onNewInfo(new ClusterInfo(diskUsages, null, null, null)); + + mockAppender.assertAllExpectationsMatched(); + Loggers.removeAppender(diskThresholdMonitorLogger, mockAppender); + mockAppender.stop(); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettingsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettingsTests.java index 6272ffc751aff..5bbd3ab33ce74 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettingsTests.java @@ -249,4 +249,33 @@ public void testSequenceOfUpdates() { } } + public void testThresholdDescriptions() { + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + DiskThresholdSettings diskThresholdSettings = new DiskThresholdSettings(Settings.EMPTY, clusterSettings); + assertThat(diskThresholdSettings.describeLowThreshold(), equalTo("85%")); + assertThat(diskThresholdSettings.describeHighThreshold(), equalTo("90%")); + assertThat(diskThresholdSettings.describeFloodStageThreshold(), equalTo("95%")); + + diskThresholdSettings = new DiskThresholdSettings(Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "91.2%") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "91.3%") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "91.4%") + .build(), clusterSettings); + + assertThat(diskThresholdSettings.describeLowThreshold(), equalTo("91.2%")); + assertThat(diskThresholdSettings.describeHighThreshold(), equalTo("91.3%")); + assertThat(diskThresholdSettings.describeFloodStageThreshold(), equalTo("91.4%")); + + diskThresholdSettings = new DiskThresholdSettings(Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1GB") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "10MB") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1B") + .build(), clusterSettings); + + assertThat(diskThresholdSettings.describeLowThreshold(), equalTo("1gb")); + assertThat(diskThresholdSettings.describeHighThreshold(), equalTo("10mb")); + assertThat(diskThresholdSettings.describeFloodStageThreshold(), equalTo("1b")); + } + } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 3f1975f35369a..55f4154680a05 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -315,22 +315,22 @@ public void testShardSizeAndRelocatingSize() { test_2 = ShardRoutingHelper.initialize(test_2, "node1"); test_2 = ShardRoutingHelper.moveToStarted(test_2); - assertEquals(1000L, DiskThresholdDecider.getExpectedShardSize(test_2, allocation, 0)); - assertEquals(100L, DiskThresholdDecider.getExpectedShardSize(test_1, allocation, 0)); - assertEquals(10L, DiskThresholdDecider.getExpectedShardSize(test_0, allocation, 0)); + assertEquals(1000L, getExpectedShardSize(test_2, 0L, allocation)); + assertEquals(100L, getExpectedShardSize(test_1, 0L, allocation)); + assertEquals(10L, getExpectedShardSize(test_0, 0L, allocation)); RoutingNode node = new RoutingNode("node1", new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), test_0, test_1.getTargetRelocatingShard(), test_2); - assertEquals(100L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, false, "/dev/null")); - assertEquals(90L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/null")); - assertEquals(0L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/some/other/dev")); - assertEquals(0L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/some/other/dev")); + assertEquals(100L, sizeOfRelocatingShards(allocation, node, false, "/dev/null")); + assertEquals(90L, sizeOfRelocatingShards(allocation, node, true, "/dev/null")); + assertEquals(0L, sizeOfRelocatingShards(allocation, node, true, "/dev/some/other/dev")); + assertEquals(0L, sizeOfRelocatingShards(allocation, node, true, "/dev/some/other/dev")); ShardRouting test_3 = ShardRouting.newUnassigned(new ShardId(index, 3), false, PeerRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); test_3 = ShardRoutingHelper.initialize(test_3, "node1"); test_3 = ShardRoutingHelper.moveToStarted(test_3); - assertEquals(0L, DiskThresholdDecider.getExpectedShardSize(test_3, allocation, 0)); + assertEquals(0L, getExpectedShardSize(test_3, 0L, allocation)); ShardRouting other_0 = ShardRouting.newUnassigned(new ShardId("other", "5678", 0), randomBoolean(), @@ -342,14 +342,19 @@ public void testShardSizeAndRelocatingSize() { node = new RoutingNode("node1", new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), test_0, test_1.getTargetRelocatingShard(), test_2, other_0.getTargetRelocatingShard()); if (other_0.primary()) { - assertEquals(10100L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, false, "/dev/null")); - assertEquals(10090L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/null")); + assertEquals(10100L, sizeOfRelocatingShards(allocation, node, false, "/dev/null")); + assertEquals(10090L, sizeOfRelocatingShards(allocation, node, true, "/dev/null")); } else { - assertEquals(100L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, false, "/dev/null")); - assertEquals(90L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/null")); + assertEquals(100L, sizeOfRelocatingShards(allocation, node, false, "/dev/null")); + assertEquals(90L, sizeOfRelocatingShards(allocation, node, true, "/dev/null")); } } + public long sizeOfRelocatingShards(RoutingAllocation allocation, RoutingNode node, boolean subtractShardsMovingAway, String dataPath) { + return DiskThresholdDecider.sizeOfRelocatingShards(node, subtractShardsMovingAway, dataPath, + allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); + } + public void testSizeShrinkIndex() { ImmutableOpenMap.Builder shardSizes = ImmutableOpenMap.builder(); shardSizes.put("[test][0][p]", 10L); @@ -404,22 +409,22 @@ public void testSizeShrinkIndex() { ShardRouting test_3 = ShardRouting.newUnassigned(new ShardId(index, 3), true, LocalShardsRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); test_3 = ShardRoutingHelper.initialize(test_3, "node1"); - assertEquals(500L, DiskThresholdDecider.getExpectedShardSize(test_3, allocation, 0)); - assertEquals(500L, DiskThresholdDecider.getExpectedShardSize(test_2, allocation, 0)); - assertEquals(100L, DiskThresholdDecider.getExpectedShardSize(test_1, allocation, 0)); - assertEquals(10L, DiskThresholdDecider.getExpectedShardSize(test_0, allocation, 0)); + assertEquals(500L, getExpectedShardSize(test_3, 0L, allocation)); + assertEquals(500L, getExpectedShardSize(test_2, 0L, allocation)); + assertEquals(100L, getExpectedShardSize(test_1, 0L, allocation)); + assertEquals(10L, getExpectedShardSize(test_0, 0L, allocation)); ShardRouting target = ShardRouting.newUnassigned(new ShardId(new Index("target", "5678"), 0), true, LocalShardsRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); - assertEquals(1110L, DiskThresholdDecider.getExpectedShardSize(target, allocation, 0)); + assertEquals(1110L, getExpectedShardSize(target, 0L, allocation)); ShardRouting target2 = ShardRouting.newUnassigned(new ShardId(new Index("target2", "9101112"), 0), true, LocalShardsRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); - assertEquals(110L, DiskThresholdDecider.getExpectedShardSize(target2, allocation, 0)); + assertEquals(110L, getExpectedShardSize(target2, 0L, allocation)); target2 = ShardRouting.newUnassigned(new ShardId(new Index("target2", "9101112"), 1), true, LocalShardsRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); - assertEquals(1000L, DiskThresholdDecider.getExpectedShardSize(target2, allocation, 0)); + assertEquals(1000L, getExpectedShardSize(target2, 0L, allocation)); // check that the DiskThresholdDecider still works even if the source index has been deleted ClusterState clusterStateWithMissingSourceIndex = ClusterState.builder(clusterState) @@ -430,7 +435,13 @@ public void testSizeShrinkIndex() { allocationService.reroute(clusterState, "foo"); RoutingAllocation allocationWithMissingSourceIndex = new RoutingAllocation(null, clusterStateWithMissingSourceIndex.getRoutingNodes(), clusterStateWithMissingSourceIndex, info, 0); - assertEquals(42L, DiskThresholdDecider.getExpectedShardSize(target, allocationWithMissingSourceIndex, 42L)); - assertEquals(42L, DiskThresholdDecider.getExpectedShardSize(target2, allocationWithMissingSourceIndex, 42L)); + assertEquals(42L, getExpectedShardSize(target, 42L, allocationWithMissingSourceIndex)); + assertEquals(42L, getExpectedShardSize(target2, 42L, allocationWithMissingSourceIndex)); } + + private static long getExpectedShardSize(ShardRouting shardRouting, long defaultSize, RoutingAllocation allocation) { + return DiskThresholdDecider.getExpectedShardSize(shardRouting, defaultSize, + allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); + } + } From b51d525924d3aaa616b072fa9870c98ad7f5cb24 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 18 Oct 2019 13:29:09 +0100 Subject: [PATCH 2/5] Rename parameters --- .../cluster/routing/allocation/DiskThresholdMonitor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index 00d2c0ecb4fa7..c4aa724ae3996 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -336,10 +336,10 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener .execute(ActionListener.map(wrappedListener, r -> null)); } - private static void cleanUpRemovedNodes(ObjectLookupContainer nodes, Set nodesOverLowThreshold) { - for (String node : nodesOverLowThreshold) { - if (nodes.contains(node) == false) { - nodesOverLowThreshold.remove(node); + private static void cleanUpRemovedNodes(ObjectLookupContainer nodesToKeep, Set nodesToCleanUp) { + for (String node : nodesToCleanUp) { + if (nodesToKeep.contains(node) == false) { + nodesToCleanUp.remove(node); } } } From 895ffa8620276992e130ba96e8d74b8fa1b87b17 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 18 Oct 2019 13:29:47 +0100 Subject: [PATCH 3/5] Reorder parameters --- .../cluster/routing/allocation/DiskThresholdMonitor.java | 4 ++-- .../cluster/routing/allocation/DiskThresholdMonitorTests.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index c4aa724ae3996..783b38288db50 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -235,7 +235,7 @@ public void onNewInfo(ClusterInfo info) { final DiskUsage usageIncludingRelocations; final long relocatingShardsSize; if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step - relocatingShardsSize = sizeOfRelocatingShards(info, reroutedClusterState, diskUsage, routingNode); + relocatingShardsSize = sizeOfRelocatingShards(routingNode, diskUsage, info, reroutedClusterState); usageIncludingRelocations = new DiskUsage(diskUsage.getNodeId(), diskUsage.getNodeName(), diskUsage.getPath(), diskUsage.getTotalBytes(), diskUsage.getFreeBytes() - relocatingShardsSize); } else { @@ -297,7 +297,7 @@ public void onNewInfo(ClusterInfo info) { } // exposed for tests to override - long sizeOfRelocatingShards(ClusterInfo info, ClusterState reroutedClusterState, DiskUsage diskUsage, RoutingNode routingNode) { + long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, ClusterInfo info, ClusterState reroutedClusterState) { return DiskThresholdDecider.sizeOfRelocatingShards(routingNode, true, diskUsage.getPath(), info, reroutedClusterState.metaData(), reroutedClusterState.routingTable()); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index e4f8ab52617e4..3a12db09855c6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -402,7 +402,7 @@ protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionLi } @Override - long sizeOfRelocatingShards(ClusterInfo info, ClusterState reroutedClusterState, DiskUsage diskUsage, RoutingNode routingNode) { + long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, ClusterInfo info, ClusterState reroutedClusterState) { return relocatingShardSizeRef.get(); } }; From ff041dcd42986c3ed6da5192e3d5a782393c483a Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 18 Oct 2019 13:33:46 +0100 Subject: [PATCH 4/5] Assert messages emitted when passing two watermarks at once --- .../allocation/DiskThresholdMonitorTests.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index 3a12db09855c6..5419a37d4d4cc 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -451,6 +451,26 @@ long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, Cluste assertSingleInfoMessage(monitor, allDisksOk, "low disk watermark [85%] no longer exceeded on *"); + + assertRepeatedWarningMessages(monitor, aboveFloodStageWatermark, + "flood stage disk watermark [95%] exceeded on * all indices on this node will be marked read-only"); + + assertSingleInfoMessage(monitor, allDisksOk, + "low disk watermark [85%] no longer exceeded on *"); + + assertRepeatedWarningMessages(monitor, aboveHighWatermark, + "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + + "the node is expected to continue to exceed the high disk watermark when these relocations are complete"); + + assertSingleInfoMessage(monitor, allDisksOk, + "low disk watermark [85%] no longer exceeded on *"); + + assertRepeatedWarningMessages(monitor, aboveFloodStageWatermark, + "flood stage disk watermark [95%] exceeded on * all indices on this node will be marked read-only"); + + assertSingleInfoMessage(monitor, aboveLowWatermark, + "high disk watermark [90%] no longer exceeded on * but low disk watermark [85%] is still exceeded"); + } private void assertNoLogging(DiskThresholdMonitor monitor, From a866bcbcfee1b2abf450830e92af0adbde372873 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 18 Oct 2019 13:52:52 +0100 Subject: [PATCH 5/5] Add some testing of behaviour if time does not pass --- .../allocation/DiskThresholdMonitorTests.java | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index 5419a37d4d4cc..61c152917958b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -379,14 +379,17 @@ public void testDiskMonitorLogging() throws IllegalAccessException { final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .nodes(DiscoveryNodes.builder().add(newNode("node1"))).build(); final AtomicReference clusterStateRef = new AtomicReference<>(clusterState); + final AtomicBoolean advanceTime = new AtomicBoolean(randomBoolean()); final LongSupplier timeSupplier = new LongSupplier() { long time; @Override public long getAsLong() { - // advance time every check - time += DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).getMillis() + 1; + if (advanceTime.get()) { + time += DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).getMillis() + 1; + } + logger.info("time: [{}]", time); return time; } }; @@ -429,35 +432,54 @@ long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, Cluste assertSingleInfoMessage(monitor, aboveLowWatermark, "low disk watermark [85%] exceeded on * replicas will not be assigned to this node"); + advanceTime.set(false); // will do one reroute and emit warnings, but subsequent reroutes and associated messages are delayed + assertSingleWarningMessage(monitor, aboveHighWatermark, + "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + + "the node is expected to continue to exceed the high disk watermark when these relocations are complete"); + + advanceTime.set(true); assertRepeatedWarningMessages(monitor, aboveHighWatermark, "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + "the node is expected to continue to exceed the high disk watermark when these relocations are complete"); + advanceTime.set(randomBoolean()); assertRepeatedWarningMessages(monitor, aboveFloodStageWatermark, "flood stage disk watermark [95%] exceeded on * all indices on this node will be marked read-only"); relocatingShardSizeRef.set(-5L); + advanceTime.set(true); assertSingleInfoMessage(monitor, aboveHighWatermark, "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + "the node is expected to be below the high disk watermark when these relocations are complete"); relocatingShardSizeRef.set(0L); + timeSupplier.getAsLong(); // advance time long enough to do another reroute + advanceTime.set(false); // will do one reroute and emit warnings, but subsequent reroutes and associated messages are delayed + assertSingleWarningMessage(monitor, aboveHighWatermark, + "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + + "the node is expected to continue to exceed the high disk watermark when these relocations are complete"); + + advanceTime.set(true); assertRepeatedWarningMessages(monitor, aboveHighWatermark, "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + "the node is expected to continue to exceed the high disk watermark when these relocations are complete"); + advanceTime.set(randomBoolean()); assertSingleInfoMessage(monitor, aboveLowWatermark, "high disk watermark [90%] no longer exceeded on * but low disk watermark [85%] is still exceeded"); + advanceTime.set(true); // only log about dropping below the low disk watermark on a reroute assertSingleInfoMessage(monitor, allDisksOk, "low disk watermark [85%] no longer exceeded on *"); + advanceTime.set(randomBoolean()); assertRepeatedWarningMessages(monitor, aboveFloodStageWatermark, "flood stage disk watermark [95%] exceeded on * all indices on this node will be marked read-only"); assertSingleInfoMessage(monitor, allDisksOk, "low disk watermark [85%] no longer exceeded on *"); + advanceTime.set(true); assertRepeatedWarningMessages(monitor, aboveHighWatermark, "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + "the node is expected to continue to exceed the high disk watermark when these relocations are complete"); @@ -508,6 +530,13 @@ private void assertRepeatedWarningMessages(DiskThresholdMonitor monitor, } } + private void assertSingleWarningMessage(DiskThresholdMonitor monitor, + ImmutableOpenMap diskUsages, + String message) throws IllegalAccessException { + assertLogging(monitor, diskUsages, Level.WARN, message); + assertNoLogging(monitor, diskUsages); + } + private void assertSingleInfoMessage(DiskThresholdMonitor monitor, ImmutableOpenMap diskUsages, String message) throws IllegalAccessException {