From 0a39ebd9d894a0910524c33cf86d5df7444cba1e Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 29 Aug 2019 12:39:28 +0100 Subject: [PATCH 1/2] Avoid overshooting watermarks during relocation Today the `DiskThresholdDecider` attempts to account for already-relocating shards when deciding how to allocate or relocate a shard. Its goal is to stop relocating shards onto a node before that node exceeds the low watermark, and to stop relocating shards away from a node as soon as the node drops below the high watermark. The decider handles multiple data paths by only accounting for relocating shards that affect the appropriate data path. However, this mechanism does not correctly account for _new_ relocating shards, which are unwittingly ignored. This means that we may evict far too many shards from a node above the high watermark, and may relocate far too many shards onto a node causing it to blow right past the low watermark and potentially other watermarks too. There are in fact two distinct issues that this PR fixes. New incoming shards have an unknown data path until the `ClusterInfoService` refreshes its statistics. New outgoing shards have a known data path, but we fail to account for the change of the corresponding `ShardRouting` from `STARTED` to `RELOCATING`, meaning that we fail to find the correct data path and treat the path as unknown here too. This PR also reworks the `MockDiskUsagesIT` test to avoid using fake data paths for all shards. With the changes here, the data paths are handled in tests as they are in production, except that their sizes are fake. Fixes #45177 Backport of #46079 --- .../cluster/InternalClusterInfoService.java | 20 +- .../decider/DiskThresholdDecider.java | 32 +- .../decider/DiskThresholdDeciderTests.java | 17 +- .../DiskThresholdDeciderUnitTests.java | 2 +- .../allocation/decider/MockDiskUsagesIT.java | 366 ++++++++++++++---- .../MockInternalClusterInfoService.java | 114 ++---- 6 files changed, 383 insertions(+), 168 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index f06d69057ccea..d5feb18544572 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -278,6 +278,11 @@ private void maybeRefresh() { } } + // allow tests to adjust the node stats on receipt + List adjustNodesStats(List nodeStats) { + return nodeStats; + } + /** * Refreshes the ClusterInfo in a blocking fashion */ @@ -287,12 +292,13 @@ public final ClusterInfo refresh() { } final CountDownLatch nodeLatch = updateNodeStats(new ActionListener() { @Override - public void onResponse(NodesStatsResponse nodeStatses) { - ImmutableOpenMap.Builder newLeastAvaiableUsages = ImmutableOpenMap.builder(); - ImmutableOpenMap.Builder newMostAvaiableUsages = ImmutableOpenMap.builder(); - fillDiskUsagePerNode(logger, nodeStatses.getNodes(), newLeastAvaiableUsages, newMostAvaiableUsages); - leastAvailableSpaceUsages = newLeastAvaiableUsages.build(); - mostAvailableSpaceUsages = newMostAvaiableUsages.build(); + public void onResponse(NodesStatsResponse nodesStatsResponse) { + ImmutableOpenMap.Builder leastAvailableUsagesBuilder = ImmutableOpenMap.builder(); + ImmutableOpenMap.Builder mostAvailableUsagesBuilder = ImmutableOpenMap.builder(); + fillDiskUsagePerNode(logger, adjustNodesStats(nodesStatsResponse.getNodes()), + leastAvailableUsagesBuilder, mostAvailableUsagesBuilder); + leastAvailableSpaceUsages = leastAvailableUsagesBuilder.build(); + mostAvailableSpaceUsages = mostAvailableUsagesBuilder.build(); } @Override @@ -394,7 +400,7 @@ static void fillDiskUsagePerNode(Logger logger, List nodeStatsArray, if (leastAvailablePath == null) { assert mostAvailablePath == null; mostAvailablePath = leastAvailablePath = info; - } else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()){ + } else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()) { leastAvailablePath = info; } else if (mostAvailablePath.getAvailable().getBytes() < info.getAvailable().getBytes()) { mostAvailablePath = info; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 1f048fca76c09..839d7dbe80c84 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -90,16 +90,36 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio boolean subtractShardsMovingAway, String dataPath) { ClusterInfo clusterInfo = allocation.clusterInfo(); long totalSize = 0; - for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) { - String actualPath = clusterInfo.getDataPath(routing); - if (dataPath.equals(actualPath)) { - if (routing.initializing() && routing.relocatingNodeId() != null) { - totalSize += getExpectedShardSize(routing, allocation, 0); - } else if (subtractShardsMovingAway && routing.relocating()) { + + for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) { + if (routing.relocatingNodeId() == null) { + // in practice the only initializing-but-not-relocating shards with a nonzero expected shard size will be ones created + // by a resize (shrink/split/clone) operation which we expect to happen using hard links, so they shouldn't be taking + // any additional space and can be ignored here + continue; + } + + final String actualPath = clusterInfo.getDataPath(routing); + // if we don't yet know the actual path of the incoming shard then conservatively assume it's going to the path with the least + // free space + if (actualPath == null || actualPath.equals(dataPath)) { + totalSize += getExpectedShardSize(routing, allocation, 0); + } + } + + if (subtractShardsMovingAway) { + for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING)) { + String actualPath = clusterInfo.getDataPath(routing); + if (actualPath == null) { + // we might know the path of this shard from before when it was relocating + actualPath = clusterInfo.getDataPath(routing.cancelRelocation()); + } + if (dataPath.equals(actualPath)) { totalSize -= getExpectedShardSize(routing, allocation, 0); } } } + return totalSize; } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 4e1a567d98540..eac201b4f33e8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.ESAllocationTestCase; -import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -1028,4 +1027,20 @@ public void logShardStates(ClusterState state) { rn.shardsWithState(RELOCATING), rn.shardsWithState(STARTED)); } + + /** + * ClusterInfo that always reports /dev/null for the shards' data paths. + */ + static class DevNullClusterInfo extends ClusterInfo { + DevNullClusterInfo(ImmutableOpenMap leastAvailableSpaceUsage, + ImmutableOpenMap mostAvailableSpaceUsage, + ImmutableOpenMap shardSizes) { + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null); + } + + @Override + public String getDataPath(ShardRouting shardRouting) { + return "/dev/null"; + } + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 9e68a7c1927fe..10f3d1f2e0d67 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -25,7 +25,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.ESAllocationTestCase; -import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo; +import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDeciderTests.DevNullClusterInfo; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java index 1ad18c1f69f54..821a908e05756 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java @@ -16,135 +16,339 @@ * specific language governing permissions and limitations * under the License. */ - package org.elasticsearch.cluster.routing.allocation.decider; -import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.ClusterInfoService; -import org.elasticsearch.cluster.DiskUsage; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.MockInternalClusterInfoService; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.env.Environment; +import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; -import java.util.ArrayList; +import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING; +import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING; +import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING; +import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class MockDiskUsagesIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - // Use the mock internal cluster info service, which has fake-able disk usages - return Arrays.asList(MockInternalClusterInfoService.TestPlugin.class); + return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class); + } + + @Override + public Settings indexSettings() { + // ensure that indices do not use custom data paths + return Settings.builder().put(super.indexSettings()).putNull(IndexMetaData.SETTING_DATA_PATH).build(); + } + + private static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, long freeBytes) { + return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes); } public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception { - List nodes = internalCluster().startNodes(3); + for (int i = 0; i < 3; i++) { + // ensure that each node has a single data path + internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir())); + } - // Wait for all 3 nodes to be up - assertBusy(() -> { - NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get(); - assertThat(resp.getNodes().size(), equalTo(3)); - }); + final List nodeIds = StreamSupport.stream(client().admin().cluster().prepareState().get().getState() + .getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).collect(Collectors.toList()); + + final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); + clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200)); + clusterInfoService.onMaster(); + + // prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk + clusterInfoService.shardSizeFunction = shardRouting -> 0L; - // Start with all nodes at 50% usage - final MockInternalClusterInfoService cis = (MockInternalClusterInfoService) - internalCluster().getInstance(ClusterInfoService.class, internalCluster().getMasterName()); - cis.setUpdateFrequency(TimeValue.timeValueMillis(200)); - cis.onMaster(); - cis.setN1Usage(nodes.get(0), new DiskUsage(nodes.get(0), "n1", "/dev/null", 100, 50)); - cis.setN2Usage(nodes.get(1), new DiskUsage(nodes.get(1), "n2", "/dev/null", 100, 50)); - cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), "n3", "/dev/null", 100, 50)); + // start with all nodes below the watermark + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)); final boolean watermarkBytes = randomBoolean(); // we have to consistently use bytes or percentage for the disk watermark settings - client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "20b" : "80%") - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%") - .put( - DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), - watermarkBytes ? "0b" : "100%") - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms")).get(); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%") + .put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%") + .put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), watermarkBytes ? "0b" : "100%") + .put(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms"))); // Create an index with 10 shards so we can check allocation for it - prepareCreate("test").setSettings(Settings.builder() - .put("number_of_shards", 10) - .put("number_of_replicas", 0) - .put("index.routing.allocation.exclude._name", "")).get(); + assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 10).put("number_of_replicas", 0))); ensureGreen("test"); - // Block until the "fake" cluster info is retrieved at least once assertBusy(() -> { - ClusterInfo info = cis.getClusterInfo(); - logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size()); - assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0)); + final Map shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(0)), greaterThanOrEqualTo(3)); + assertThat("node1 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(1)), greaterThanOrEqualTo(3)); + assertThat("node2 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(2)), greaterThanOrEqualTo(3)); }); - final List realNodeNames = new ArrayList<>(); - ClusterStateResponse resp = client().admin().cluster().prepareState().get(); - Iterator iter = resp.getState().getRoutingNodes().iterator(); - while (iter.hasNext()) { - RoutingNode node = iter.next(); - realNodeNames.add(node.nodeId()); - logger.info("--> node {} has {} shards", - node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); + // move node2 above high watermark + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, + discoveryNode.getId().equals(nodeIds.get(2)) ? between(0, 9) : between(10, 100)); + + logger.info("--> waiting for shards to relocate off node [{}]", nodeIds.get(2)); + + assertBusy(() -> { + final Map shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has 5 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(5)); + assertThat("node1 has 5 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(5)); + assertThat("node2 has 0 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(0)); + }); + + // move all nodes below watermark again + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)); + + logger.info("--> waiting for shards to rebalance back onto node [{}]", nodeIds.get(2)); + + assertBusy(() -> { + final Map shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(0)), greaterThanOrEqualTo(3)); + assertThat("node1 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(1)), greaterThanOrEqualTo(3)); + assertThat("node2 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(2)), greaterThanOrEqualTo(3)); + }); + } + + public void testOnlyMovesEnoughShardsToDropBelowHighWatermark() throws Exception { + for (int i = 0; i < 3; i++) { + // ensure that each node has a single data path + internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir())); } - // Update the disk usages so one node has now passed the high watermark - cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 50)); - cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 50)); - cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 0)); // nothing free on node3 + final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); - // Retrieve the count of shards on each node - final Map nodesToShardCount = new HashMap<>(); + final AtomicReference masterAppliedClusterState = new AtomicReference<>(); + internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> { + masterAppliedClusterState.set(event.state()); + clusterInfoService.refresh(); // so that a subsequent reroute sees disk usage according to the current state + }); + + // shards are 1 byte large + clusterInfoService.shardSizeFunction = shardRouting -> 1L; + + // start with all nodes below the watermark + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L, 1000L); + + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "90%") + .put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "90%") + .put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100%") + .put(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms"))); + + final List nodeIds = StreamSupport.stream(client().admin().cluster().prepareState().get().getState() + .getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).collect(Collectors.toList()); + + assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0))); + + ensureGreen("test"); assertBusy(() -> { - ClusterStateResponse resp12 = client().admin().cluster().prepareState().get(); - Iterator iter12 = resp12.getState().getRoutingNodes().iterator(); - while (iter12.hasNext()) { - RoutingNode node = iter12.next(); - logger.info("--> node {} has {} shards", - node.nodeId(), resp12.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); - nodesToShardCount.put(node.nodeId(), resp12.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); - } - assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5)); - assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5)); - assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0)); + final Map shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has 2 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(2)); + assertThat("node1 has 2 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(2)); + assertThat("node2 has 2 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(2)); + }); + + // disable rebalancing, or else we might move too many shards away and then rebalance them back again + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE))); + + // node2 suddenly has 99 bytes free, less than 10%, but moving one shard is enough to bring it up to 100 bytes free: + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L, + discoveryNode.getId().equals(nodeIds.get(2)) + ? 101L - masterAppliedClusterState.get().getRoutingNodes().node(nodeIds.get(2)).numberOfOwningShards() + : 1000L); + + clusterInfoService.refresh(); + + logger.info("--> waiting for shards to relocate off node [{}]", nodeIds.get(2)); + + // must wait for relocation to start + assertBusy(() -> assertThat("node2 has 1 shard", getShardCountByNodeId().get(nodeIds.get(2)), equalTo(1))); + + // ensure that relocations finished without moving any more shards + ensureGreen("test"); + assertThat("node2 has 1 shard", getShardCountByNodeId().get(nodeIds.get(2)), equalTo(1)); + } + + public void testDoesNotExceedLowWatermarkWhenRebalancing() throws Exception { + for (int i = 0; i < 3; i++) { + // ensure that each node has a single data path + internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir())); + } + + final AtomicReference masterAppliedClusterState = new AtomicReference<>(); + + final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); + + final List nodeIds = StreamSupport.stream(client().admin().cluster().prepareState().get().getState() + .getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).collect(Collectors.toList()); + + internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> { + assertThat(event.state().getRoutingNodes().node(nodeIds.get(2)).size(), lessThanOrEqualTo(1)); + masterAppliedClusterState.set(event.state()); + clusterInfoService.refresh(); // so that a subsequent reroute sees disk usage according to the current state }); - // Update the disk usages so one node is now back under the high watermark - cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 50)); - cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 50)); - cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 50)); // node3 has free space now + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "85%") + .put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "100%") + .put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100%"))); + + // shards are 1 byte large + clusterInfoService.shardSizeFunction = shardRouting -> 1L; - // Retrieve the count of shards on each node - nodesToShardCount.clear(); + // node 2 only has space for one shard + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L, + discoveryNode.getId().equals(nodeIds.get(2)) + ? 150L - masterAppliedClusterState.get().getRoutingNodes().node(nodeIds.get(2)).numberOfOwningShards() + : 1000L); + + assertAcked(prepareCreate("test").setSettings(Settings.builder() + .put("number_of_shards", 6) + .put("number_of_replicas", 0) + .put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getConcreteSettingForNamespace("_id").getKey(), nodeIds.get(2)))); + ensureGreen("test"); assertBusy(() -> { - ClusterStateResponse resp1 = client().admin().cluster().prepareState().get(); - Iterator iter1 = resp1.getState().getRoutingNodes().iterator(); - while (iter1.hasNext()) { - RoutingNode node = iter1.next(); - logger.info("--> node {} has {} shards", - node.nodeId(), resp1.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); - nodesToShardCount.put(node.nodeId(), resp1.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); + final Map shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has 3 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(3)); + assertThat("node1 has 3 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(3)); + assertThat("node2 has 0 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(0)); + }); + + assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() + .putNull(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getConcreteSettingForNamespace("_id").getKey()))); + + logger.info("--> waiting for shards to relocate onto node [{}]", nodeIds.get(2)); + + ensureGreen("test"); + assertThat("node2 has 1 shard", getShardCountByNodeId().get(nodeIds.get(2)), equalTo(1)); + } + + public void testMovesShardsOffSpecificDataPathAboveWatermark() throws Exception { + + // start one node with two data paths + final Path pathOverWatermark = createTempDir(); + final Settings.Builder twoPathSettings = Settings.builder(); + if (randomBoolean()) { + twoPathSettings.putList(Environment.PATH_DATA_SETTING.getKey(), createTempDir().toString(), pathOverWatermark.toString()); + } else { + twoPathSettings.putList(Environment.PATH_DATA_SETTING.getKey(), pathOverWatermark.toString(), createTempDir().toString()); + } + internalCluster().startNode(twoPathSettings); + final String nodeWithTwoPaths = client().admin().cluster().prepareNodesInfo().get().getNodes().get(0).getNode().getId(); + + // other two nodes have one data path each + internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir())); + internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir())); + + final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); + + // prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk + clusterInfoService.shardSizeFunction = shardRouting -> 0L; + + // start with all paths below the watermark + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)); + + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "90%") + .put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "90%") + .put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100%"))); + + final List nodeIds = StreamSupport.stream(client().admin().cluster().prepareState().get().getState() + .getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).collect(Collectors.toList()); + + assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0))); + + ensureGreen("test"); + + { + final Map shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has 2 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(2)); + assertThat("node1 has 2 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(2)); + assertThat("node2 has 2 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(2)); + } + + final long shardsOnGoodPath = Arrays.stream(client().admin().indices().prepareStats("test").get().getShards()) + .filter(shardStats -> shardStats.getShardRouting().currentNodeId().equals(nodeWithTwoPaths) + && shardStats.getDataPath().startsWith(pathOverWatermark.toString()) == false).count(); + logger.info("--> shards on good path: [{}]", shardsOnGoodPath); + + // one of the paths on node0 suddenly exceeds the high watermark + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100L, + fsInfoPath.getPath().startsWith(pathOverWatermark.toString()) ? between(0, 9) : between(10, 100)); + + // disable rebalancing, or else we might move shards back onto the over-full path since we're not faking that + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE))); + + clusterInfoService.refresh(); + + logger.info("--> waiting for shards to relocate off path [{}]", pathOverWatermark); + + assertBusy(() -> { + for (final ShardStats shardStats : client().admin().indices().prepareStats("test").get().getShards()) { + assertThat(shardStats.getDataPath(), not(startsWith(pathOverWatermark.toString()))); } - assertThat("node1 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(0)), greaterThanOrEqualTo(3)); - assertThat("node2 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(1)), greaterThanOrEqualTo(3)); - assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3)); }); + + ensureGreen("test"); + + for (final ShardStats shardStats : client().admin().indices().prepareStats("test").get().getShards()) { + assertThat(shardStats.getDataPath(), not(startsWith(pathOverWatermark.toString()))); + } + + assertThat("should not have moved any shards off of the path that wasn't too full", + Arrays.stream(client().admin().indices().prepareStats("test").get().getShards()) + .filter(shardStats -> shardStats.getShardRouting().currentNodeId().equals(nodeWithTwoPaths) + && shardStats.getDataPath().startsWith(pathOverWatermark.toString()) == false).count(), equalTo(shardsOnGoodPath)); + } + + private Map getShardCountByNodeId() { + final Map shardCountByNodeId = new HashMap<>(); + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + for (final RoutingNode node : clusterState.getRoutingNodes()) { + logger.info("----> node {} has {} shards", + node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); + shardCountByNodeId.put(node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); + } + return shardCountByNodeId; } + + private MockInternalClusterInfoService getMockInternalClusterInfoService() { + return (MockInternalClusterInfoService) internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class); + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index 037bb54491920..a6299dc1b7747 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -18,112 +18,82 @@ */ package org.elasticsearch.cluster; -import java.util.Arrays; -import java.util.Collections; -import java.util.concurrent.CountDownLatch; -import java.util.function.Consumer; - -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; -import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; -/** - * Fake ClusterInfoService class that allows updating the nodes stats disk - * usage with fake values - */ public class MockInternalClusterInfoService extends InternalClusterInfoService { /** This is a marker plugin used to trigger MockNode to use this mock info service. */ public static class TestPlugin extends Plugin {} - private final ClusterName clusterName; - private volatile NodeStats[] stats = new NodeStats[3]; + @Nullable // if no fakery should take place + public volatile Function shardSizeFunction; - /** Create a fake NodeStats for the given node and usage */ - public static NodeStats makeStats(String nodeName, DiskUsage usage) { - FsInfo.Path[] paths = new FsInfo.Path[1]; - FsInfo.Path path = new FsInfo.Path("/dev/null", null, - usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeBytes()); - paths[0] = path; - FsInfo fsInfo = new FsInfo(System.currentTimeMillis(), null, paths); - return new NodeStats( - new DiscoveryNode(nodeName, ESTestCase.buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), - System.currentTimeMillis(), - null, null, null, null, null, - fsInfo, - null, null, null, - null, null, null, null); - } + @Nullable // if no fakery should take place + public volatile BiFunction diskUsageFunction; public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client, Consumer listener) { super(settings, clusterService, threadPool, client, listener); - this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); - stats[0] = makeStats("node_t1", new DiskUsage("node_t1", "n1", "/dev/null", 100, 100)); - stats[1] = makeStats("node_t2", new DiskUsage("node_t2", "n2", "/dev/null", 100, 100)); - stats[2] = makeStats("node_t3", new DiskUsage("node_t3", "n3", "/dev/null", 100, 100)); - } - - public void setN1Usage(String nodeName, DiskUsage newUsage) { - stats[0] = makeStats(nodeName, newUsage); - } - - public void setN2Usage(String nodeName, DiskUsage newUsage) { - stats[1] = makeStats(nodeName, newUsage); - } - - public void setN3Usage(String nodeName, DiskUsage newUsage) { - stats[2] = makeStats(nodeName, newUsage); } @Override - public CountDownLatch updateNodeStats(final ActionListener listener) { - NodesStatsResponse response = new NodesStatsResponse(clusterName, Arrays.asList(stats), Collections.emptyList()); - listener.onResponse(response); - return new CountDownLatch(0); + public ClusterInfo getClusterInfo() { + final ClusterInfo clusterInfo = super.getClusterInfo(); + return new SizeFakingClusterInfo(clusterInfo); } @Override - public CountDownLatch updateIndicesStats(final ActionListener listener) { - // Not used, so noop - return new CountDownLatch(0); - } + List adjustNodesStats(List nodesStats) { + final BiFunction diskUsageFunction = this.diskUsageFunction; + if (diskUsageFunction == null) { + return nodesStats; + } - @Override - public ClusterInfo getClusterInfo() { - ClusterInfo clusterInfo = super.getClusterInfo(); - return new DevNullClusterInfo(clusterInfo.getNodeLeastAvailableDiskUsages(), - clusterInfo.getNodeMostAvailableDiskUsages(), clusterInfo.shardSizes); + return nodesStats.stream().map(nodeStats -> { + final DiscoveryNode discoveryNode = nodeStats.getNode(); + final FsInfo oldFsInfo = nodeStats.getFs(); + return new NodeStats(discoveryNode, nodeStats.getTimestamp(), nodeStats.getIndices(), nodeStats.getOs(), + nodeStats.getProcess(), nodeStats.getJvm(), nodeStats.getThreadPool(), new FsInfo(oldFsInfo.getTimestamp(), + oldFsInfo.getIoStats(), + StreamSupport.stream(oldFsInfo.spliterator(), false) + .map(fsInfoPath -> diskUsageFunction.apply(discoveryNode, fsInfoPath)) + .toArray(FsInfo.Path[]::new)), nodeStats.getTransport(), + nodeStats.getHttp(), nodeStats.getBreaker(), nodeStats.getScriptStats(), nodeStats.getDiscoveryStats(), + nodeStats.getIngestStats(), nodeStats.getAdaptiveSelectionStats()); + }).collect(Collectors.toList()); } - /** - * ClusterInfo that always points to DevNull. - */ - public static class DevNullClusterInfo extends ClusterInfo { - public DevNullClusterInfo(ImmutableOpenMap leastAvailableSpaceUsage, - ImmutableOpenMap mostAvailableSpaceUsage, ImmutableOpenMap shardSizes) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null); + class SizeFakingClusterInfo extends ClusterInfo { + SizeFakingClusterInfo(ClusterInfo delegate) { + super(delegate.getNodeLeastAvailableDiskUsages(), delegate.getNodeMostAvailableDiskUsages(), + delegate.shardSizes, delegate.routingToDataPath); } @Override - public String getDataPath(ShardRouting shardRouting) { - return "/dev/null"; + public Long getShardSize(ShardRouting shardRouting) { + final Function shardSizeFunction = MockInternalClusterInfoService.this.shardSizeFunction; + if (shardSizeFunction == null) { + return super.getShardSize(shardRouting); + } + + return shardSizeFunction.apply(shardRouting); } } From 025ec798d3a79d6f708e2534c9c9c5f22505ed1b Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 29 Aug 2019 13:44:23 +0100 Subject: [PATCH 2/2] tyvm --- .../cluster/routing/allocation/decider/MockDiskUsagesIT.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java index 821a908e05756..dd33075271cb1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java @@ -19,15 +19,12 @@ package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.action.admin.indices.stats.ShardStats; -import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.MockInternalClusterInfoService; -import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.Environment; @@ -52,8 +49,6 @@ import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING; import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;