diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 1a5d0e64fcd2c..d07897199ae8e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -47,9 +47,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ReceiveTimeoutTransportException; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -88,7 +87,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode private final ClusterService clusterService; private final ThreadPool threadPool; private final NodeClient client; - private final List> listeners = Collections.synchronizedList(new ArrayList<>(1)); + private final List> listeners = new CopyOnWriteArrayList<>(); public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { this.leastAvailableSpaceUsages = ImmutableOpenMap.of(); @@ -275,6 +274,11 @@ private void maybeRefresh() { } } + // allow tests to adjust the node stats on receipt + List adjustNodesStats(List nodeStats) { + return nodeStats; + } + /** * Refreshes the ClusterInfo in a blocking fashion */ @@ -284,12 +288,13 @@ public final ClusterInfo refresh() { } final CountDownLatch nodeLatch = updateNodeStats(new ActionListener() { @Override - public void onResponse(NodesStatsResponse nodeStatses) { - ImmutableOpenMap.Builder newLeastAvaiableUsages = ImmutableOpenMap.builder(); - ImmutableOpenMap.Builder newMostAvaiableUsages = ImmutableOpenMap.builder(); - fillDiskUsagePerNode(logger, nodeStatses.getNodes(), newLeastAvaiableUsages, newMostAvaiableUsages); - leastAvailableSpaceUsages = newLeastAvaiableUsages.build(); - mostAvailableSpaceUsages = newMostAvaiableUsages.build(); + public void onResponse(NodesStatsResponse nodesStatsResponse) { + ImmutableOpenMap.Builder leastAvailableUsagesBuilder = ImmutableOpenMap.builder(); + ImmutableOpenMap.Builder mostAvailableUsagesBuilder = ImmutableOpenMap.builder(); + fillDiskUsagePerNode(logger, adjustNodesStats(nodesStatsResponse.getNodes()), + leastAvailableUsagesBuilder, mostAvailableUsagesBuilder); + leastAvailableSpaceUsages = leastAvailableUsagesBuilder.build(); + mostAvailableSpaceUsages = mostAvailableUsagesBuilder.build(); } @Override @@ -402,7 +407,7 @@ static void fillDiskUsagePerNode(Logger logger, List nodeStatsArray, if (leastAvailablePath == null) { assert mostAvailablePath == null; mostAvailablePath = leastAvailablePath = info; - } else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()){ + } else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()) { leastAvailablePath = info; } else if (mostAvailablePath.getAvailable().getBytes() < info.getAvailable().getBytes()) { mostAvailablePath = info; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index 0dad13d87a15b..fcd907a60fd0c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -221,6 +221,8 @@ public void onNewInfo(ClusterInfo info) { .collect(Collectors.toSet()); if (indicesToAutoRelease.isEmpty() == false) { + logger.info("releasing read-only block on indices " + indicesToAutoRelease + + " since they are now allocated to nodes with sufficient disk space"); updateIndicesReadOnly(indicesToAutoRelease, listener, false); } else { listener.onResponse(null); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 0838999c4f367..6c99cfa8ee056 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -90,16 +90,36 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio boolean subtractShardsMovingAway, String dataPath) { ClusterInfo clusterInfo = allocation.clusterInfo(); long totalSize = 0; - for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) { - String actualPath = clusterInfo.getDataPath(routing); - if (dataPath.equals(actualPath)) { - if (routing.initializing() && routing.relocatingNodeId() != null) { - totalSize += getExpectedShardSize(routing, allocation, 0); - } else if (subtractShardsMovingAway && routing.relocating()) { + + for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) { + if (routing.relocatingNodeId() == null) { + // in practice the only initializing-but-not-relocating shards with a nonzero expected shard size will be ones created + // by a resize (shrink/split/clone) operation which we expect to happen using hard links, so they shouldn't be taking + // any additional space and can be ignored here + continue; + } + + final String actualPath = clusterInfo.getDataPath(routing); + // if we don't yet know the actual path of the incoming shard then conservatively assume it's going to the path with the least + // free space + if (actualPath == null || actualPath.equals(dataPath)) { + totalSize += getExpectedShardSize(routing, allocation, 0); + } + } + + if (subtractShardsMovingAway) { + for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING)) { + String actualPath = clusterInfo.getDataPath(routing); + if (actualPath == null) { + // we might know the path of this shard from before when it was relocating + actualPath = clusterInfo.getDataPath(routing.cancelRelocation()); + } + if (dataPath.equals(actualPath)) { totalSize -= getExpectedShardSize(routing, allocation, 0); } } } + return totalSize; } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 4c48ce7b36068..f1236cfd252a6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.ESAllocationTestCase; -import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -1002,4 +1001,20 @@ public void logShardStates(ClusterState state) { rn.shardsWithState(RELOCATING), rn.shardsWithState(STARTED)); } + + /** + * ClusterInfo that always reports /dev/null for the shards' data paths. + */ + static class DevNullClusterInfo extends ClusterInfo { + DevNullClusterInfo(ImmutableOpenMap leastAvailableSpaceUsage, + ImmutableOpenMap mostAvailableSpaceUsage, + ImmutableOpenMap shardSizes) { + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null); + } + + @Override + public String getDataPath(ShardRouting shardRouting) { + return "/dev/null"; + } + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 8f790b41dad04..3f1975f35369a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -25,7 +25,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.ESAllocationTestCase; -import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo; +import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDeciderTests.DevNullClusterInfo; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java index c7e47e98de3cb..389c54bb128c7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java @@ -16,191 +16,179 @@ * specific language governing permissions and limitations * under the License. */ - package org.elasticsearch.cluster.routing.allocation.decider; -import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.MockInternalClusterInfoService; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.env.Environment; +import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; -import java.util.ArrayList; +import java.nio.file.Path; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING; +import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING; +import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING; +import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class MockDiskUsagesIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - // Use the mock internal cluster info service, which has fake-able disk usages return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class); } + @Override + public Settings indexSettings() { + // ensure that indices do not use custom data paths + return Settings.builder().put(super.indexSettings()).putNull(IndexMetaData.SETTING_DATA_PATH).build(); + } + + private static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, long freeBytes) { + return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes); + } + public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception { - List nodes = internalCluster().startNodes(3); + for (int i = 0; i < 3; i++) { + // ensure that each node has a single data path + internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir())); + } - // Start with all nodes at 50% usage - final MockInternalClusterInfoService cis = (MockInternalClusterInfoService) - internalCluster().getInstance(ClusterInfoService.class, internalCluster().getMasterName()); - cis.setUpdateFrequency(TimeValue.timeValueMillis(200)); - cis.onMaster(); - cis.setN1Usage(nodes.get(0), new DiskUsage(nodes.get(0), "n1", "/dev/null", 100, 50)); - cis.setN2Usage(nodes.get(1), new DiskUsage(nodes.get(1), "n2", "/dev/null", 100, 50)); - cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), "n3", "/dev/null", 100, 50)); + final List nodeIds = StreamSupport.stream(client().admin().cluster().prepareState().get().getState() + .getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).collect(Collectors.toList()); + + final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); + clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200)); + clusterInfoService.onMaster(); + + // prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk + clusterInfoService.shardSizeFunction = shardRouting -> 0L; + + // start with all nodes below the watermark + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)); final boolean watermarkBytes = randomBoolean(); // we have to consistently use bytes or percentage for the disk watermark settings - client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "20b" : "80%") - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%") - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), - watermarkBytes ? "0b" : "100%") - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms")).get(); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%") + .put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%") + .put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), watermarkBytes ? "0b" : "100%") + .put(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms"))); // Create an index with 10 shards so we can check allocation for it - prepareCreate("test").setSettings(Settings.builder() - .put("number_of_shards", 10) - .put("number_of_replicas", 0)).get(); + assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 10).put("number_of_replicas", 0))); ensureGreen("test"); - // Block until the "fake" cluster info is retrieved at least once assertBusy(() -> { - final ClusterInfo info = cis.getClusterInfo(); - logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size()); - assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0)); + final Map shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(0)), greaterThanOrEqualTo(3)); + assertThat("node1 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(1)), greaterThanOrEqualTo(3)); + assertThat("node2 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(2)), greaterThanOrEqualTo(3)); }); - final List realNodeNames = new ArrayList<>(); - { - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - for (final RoutingNode node : clusterState.getRoutingNodes()) { - realNodeNames.add(node.nodeId()); - logger.info("--> node {} has {} shards", - node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); - } - } - - // Update the disk usages so one node has now passed the high watermark - cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 50)); - cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 50)); - cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 0)); // nothing free on node3 + // move node2 above high watermark + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, + discoveryNode.getId().equals(nodeIds.get(2)) ? between(0, 9) : between(10, 100)); - logger.info("--> waiting for shards to relocate off node [{}]", realNodeNames.get(2)); + logger.info("--> waiting for shards to relocate off node [{}]", nodeIds.get(2)); assertBusy(() -> { - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - final Map nodesToShardCount = new HashMap<>(); - for (final RoutingNode node : clusterState.getRoutingNodes()) { - logger.info("--> node {} has {} shards", - node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); - nodesToShardCount.put(node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); - } - assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5)); - assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5)); - assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0)); + final Map shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has 5 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(5)); + assertThat("node1 has 5 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(5)); + assertThat("node2 has 0 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(0)); }); - // Update the disk usages so one node is now back under the high watermark - cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 50)); - cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 50)); - cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 50)); // node3 has free space now + // move all nodes below watermark again + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)); - logger.info("--> waiting for shards to rebalance back onto node [{}]", realNodeNames.get(2)); + logger.info("--> waiting for shards to rebalance back onto node [{}]", nodeIds.get(2)); assertBusy(() -> { - final Map nodesToShardCount = new HashMap<>(); - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - for (final RoutingNode node : clusterState.getRoutingNodes()) { - logger.info("--> node {} has {} shards", - node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); - nodesToShardCount.put(node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); - } - assertThat("node1 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(0)), greaterThanOrEqualTo(3)); - assertThat("node2 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(1)), greaterThanOrEqualTo(3)); - assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3)); + final Map shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(0)), greaterThanOrEqualTo(3)); + assertThat("node1 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(1)), greaterThanOrEqualTo(3)); + assertThat("node2 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(2)), greaterThanOrEqualTo(3)); }); } public void testAutomaticReleaseOfIndexBlock() throws Exception { - List nodes = internalCluster().startNodes(3); + for (int i = 0; i < 3; i++) { + // ensure that each node has a single data path + internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir())); + } - // Wait for all 3 nodes to be up - assertBusy(() -> { - NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get(); - assertThat(resp.getNodes().size(), equalTo(3)); - }); + final List nodeIds = StreamSupport.stream(client().admin().cluster().prepareState().get().getState() + .getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).collect(Collectors.toList()); - // Start with all nodes at 50% usage - final MockInternalClusterInfoService cis = (MockInternalClusterInfoService) - internalCluster().getInstance(ClusterInfoService.class, internalCluster().getMasterName()); - cis.setUpdateFrequency(TimeValue.timeValueMillis(100)); - cis.onMaster(); - cis.setN1Usage(nodes.get(0), new DiskUsage(nodes.get(0), "n1", "/dev/null", 100, 50)); - cis.setN2Usage(nodes.get(1), new DiskUsage(nodes.get(1), "n2", "/dev/null", 100, 50)); - cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), "n3", "/dev/null", 100, 50)); + final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); + clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200)); + clusterInfoService.onMaster(); + + // prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk + clusterInfoService.shardSizeFunction = shardRouting -> 0L; + + // start with all nodes below the low watermark + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(15, 100)); final boolean watermarkBytes = randomBoolean(); // we have to consistently use bytes or percentage for the disk watermark settings - client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "15b" : "85%") - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%") - .put( - DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), - watermarkBytes ? "5b" : "95%") - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "150ms")).get(); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%") + .put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%") + .put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), watermarkBytes ? "5b" : "95%") + .put(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "150ms"))); + // Create an index with 6 shards so we can check allocation for it prepareCreate("test").setSettings(Settings.builder() .put("number_of_shards", 6) .put("number_of_replicas", 0)).get(); ensureGreen("test"); - // Block until the "fake" cluster info is retrieved at least once - assertBusy(() -> { - ClusterInfo info = cis.getClusterInfo(); - logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size()); - assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0)); - }); - - final List realNodeNames = new ArrayList<>(); - ClusterStateResponse resp = client().admin().cluster().prepareState().get(); - for (RoutingNode node : resp.getState().getRoutingNodes()) { - realNodeNames.add(node.nodeId()); - logger.info("--> node {} has {} shards", - node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); + { + final Map shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has 2 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(2)); + assertThat("node1 has 2 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(2)); + assertThat("node2 has 2 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(2)); } - client().prepareIndex("test", "doc", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - assertSearchHits(client().prepareSearch().get(), "1"); + client().prepareIndex("test", "doc", "1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + assertSearchHits(client().prepareSearch("test").get(), "1"); - // Block all nodes so that re-balancing does not occur (BalancedShardsAllocator) - cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 3)); - cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 3)); - cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 3)); + // Move all nodes above the low watermark so no shard movement can occur, and at least one node above the flood stage watermark so + // the index is blocked + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, + discoveryNode.getId().equals(nodeIds.get(2)) ? between(0, 4) : between(0, 14)); - // Wait until index "test" is blocked - assertBusy(() -> assertBlocked(client().prepareIndex().setIndex("test").setType("doc").setId("1").setSource("foo", "bar"), + assertBusy(() -> assertBlocked( + client().prepareIndex().setIndex("test").setType("doc").setId("1").setSource("foo", "bar"), IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)); assertFalse(client().admin().cluster().prepareHealth("test").setWaitForEvents(Priority.LANGUID).get().isTimedOut()); @@ -208,23 +196,234 @@ public void testAutomaticReleaseOfIndexBlock() throws Exception { // Cannot add further documents assertBlocked(client().prepareIndex().setIndex("test").setType("doc").setId("2").setSource("foo", "bar"), IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK); - assertSearchHits(client().prepareSearch().get(), "1"); + assertSearchHits(client().prepareSearch("test").get(), "1"); + + logger.info("--> index is confirmed read-only, releasing disk space"); - // Update the disk usages so all nodes are back under the high and flood watermarks - cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 11)); - cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 11)); - cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 11)); + // Move all nodes below the high watermark so that the index is unblocked + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)); // Attempt to create a new document until DiskUsageMonitor unblocks the index assertBusy(() -> { try { - client().prepareIndex("test", "doc", "3").setSource("{\"foo\": \"bar\"}", XContentType.JSON) + client().prepareIndex("test", "doc", "3").setSource("foo", "bar") .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); } catch (ClusterBlockException e) { throw new AssertionError("retrying", e); } }); - assertSearchHits(client().prepareSearch().get(), "1", "3"); + assertSearchHits(client().prepareSearch("test").get(), "1", "3"); + } + + public void testOnlyMovesEnoughShardsToDropBelowHighWatermark() throws Exception { + for (int i = 0; i < 3; i++) { + // ensure that each node has a single data path + internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir())); + } + + final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); + + final AtomicReference masterAppliedClusterState = new AtomicReference<>(); + internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> { + masterAppliedClusterState.set(event.state()); + clusterInfoService.refresh(); // so that a subsequent reroute sees disk usage according to the current state + }); + + // shards are 1 byte large + clusterInfoService.shardSizeFunction = shardRouting -> 1L; + + // start with all nodes below the watermark + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L, 1000L); + + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "90%") + .put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "90%") + .put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100%") + .put(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms"))); + + final List nodeIds = StreamSupport.stream(client().admin().cluster().prepareState().get().getState() + .getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).collect(Collectors.toList()); + + assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0))); + + ensureGreen("test"); + + assertBusy(() -> { + final Map shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has 2 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(2)); + assertThat("node1 has 2 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(2)); + assertThat("node2 has 2 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(2)); + }); + + // disable rebalancing, or else we might move too many shards away and then rebalance them back again + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE))); + + // node2 suddenly has 99 bytes free, less than 10%, but moving one shard is enough to bring it up to 100 bytes free: + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L, + discoveryNode.getId().equals(nodeIds.get(2)) + ? 101L - masterAppliedClusterState.get().getRoutingNodes().node(nodeIds.get(2)).numberOfOwningShards() + : 1000L); + + clusterInfoService.refresh(); + + logger.info("--> waiting for shards to relocate off node [{}]", nodeIds.get(2)); + + // must wait for relocation to start + assertBusy(() -> assertThat("node2 has 1 shard", getShardCountByNodeId().get(nodeIds.get(2)), equalTo(1))); + + // ensure that relocations finished without moving any more shards + ensureGreen("test"); + assertThat("node2 has 1 shard", getShardCountByNodeId().get(nodeIds.get(2)), equalTo(1)); + } + + public void testDoesNotExceedLowWatermarkWhenRebalancing() throws Exception { + for (int i = 0; i < 3; i++) { + // ensure that each node has a single data path + internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir())); + } + + final AtomicReference masterAppliedClusterState = new AtomicReference<>(); + + final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); + + final List nodeIds = StreamSupport.stream(client().admin().cluster().prepareState().get().getState() + .getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).collect(Collectors.toList()); + + internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> { + assertThat(event.state().getRoutingNodes().node(nodeIds.get(2)).size(), lessThanOrEqualTo(1)); + masterAppliedClusterState.set(event.state()); + clusterInfoService.refresh(); // so that a subsequent reroute sees disk usage according to the current state + }); + + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "85%") + .put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "100%") + .put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100%"))); + + // shards are 1 byte large + clusterInfoService.shardSizeFunction = shardRouting -> 1L; + + // node 2 only has space for one shard + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L, + discoveryNode.getId().equals(nodeIds.get(2)) + ? 150L - masterAppliedClusterState.get().getRoutingNodes().node(nodeIds.get(2)).numberOfOwningShards() + : 1000L); + + assertAcked(prepareCreate("test").setSettings(Settings.builder() + .put("number_of_shards", 6) + .put("number_of_replicas", 0) + .put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getConcreteSettingForNamespace("_id").getKey(), nodeIds.get(2)))); + ensureGreen("test"); + + assertBusy(() -> { + final Map shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has 3 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(3)); + assertThat("node1 has 3 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(3)); + assertThat("node2 has 0 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(0)); + }); + + assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() + .putNull(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getConcreteSettingForNamespace("_id").getKey()))); + + logger.info("--> waiting for shards to relocate onto node [{}]", nodeIds.get(2)); + + ensureGreen("test"); + assertThat("node2 has 1 shard", getShardCountByNodeId().get(nodeIds.get(2)), equalTo(1)); + } + + public void testMovesShardsOffSpecificDataPathAboveWatermark() throws Exception { + + // start one node with two data paths + final Path pathOverWatermark = createTempDir(); + final Settings.Builder twoPathSettings = Settings.builder(); + if (randomBoolean()) { + twoPathSettings.putList(Environment.PATH_DATA_SETTING.getKey(), createTempDir().toString(), pathOverWatermark.toString()); + } else { + twoPathSettings.putList(Environment.PATH_DATA_SETTING.getKey(), pathOverWatermark.toString(), createTempDir().toString()); + } + internalCluster().startNode(twoPathSettings); + final String nodeWithTwoPaths = client().admin().cluster().prepareNodesInfo().get().getNodes().get(0).getNode().getId(); + + // other two nodes have one data path each + internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir())); + internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir())); + + final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); + + // prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk + clusterInfoService.shardSizeFunction = shardRouting -> 0L; + + // start with all paths below the watermark + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)); + + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "90%") + .put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "90%") + .put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100%"))); + + final List nodeIds = StreamSupport.stream(client().admin().cluster().prepareState().get().getState() + .getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).collect(Collectors.toList()); + + assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0))); + + ensureGreen("test"); + + { + final Map shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has 2 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(2)); + assertThat("node1 has 2 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(2)); + assertThat("node2 has 2 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(2)); + } + + final long shardsOnGoodPath = Arrays.stream(client().admin().indices().prepareStats("test").get().getShards()) + .filter(shardStats -> shardStats.getShardRouting().currentNodeId().equals(nodeWithTwoPaths) + && shardStats.getDataPath().startsWith(pathOverWatermark.toString()) == false).count(); + logger.info("--> shards on good path: [{}]", shardsOnGoodPath); + + // one of the paths on node0 suddenly exceeds the high watermark + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100L, + fsInfoPath.getPath().startsWith(pathOverWatermark.toString()) ? between(0, 9) : between(10, 100)); + + // disable rebalancing, or else we might move shards back onto the over-full path since we're not faking that + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE))); + + clusterInfoService.refresh(); + + logger.info("--> waiting for shards to relocate off path [{}]", pathOverWatermark); + + assertBusy(() -> { + for (final ShardStats shardStats : client().admin().indices().prepareStats("test").get().getShards()) { + assertThat(shardStats.getDataPath(), not(startsWith(pathOverWatermark.toString()))); + } + }); + + ensureGreen("test"); + + for (final ShardStats shardStats : client().admin().indices().prepareStats("test").get().getShards()) { + assertThat(shardStats.getDataPath(), not(startsWith(pathOverWatermark.toString()))); + } + + assertThat("should not have moved any shards off of the path that wasn't too full", + Arrays.stream(client().admin().indices().prepareStats("test").get().getShards()) + .filter(shardStats -> shardStats.getShardRouting().currentNodeId().equals(nodeWithTwoPaths) + && shardStats.getDataPath().startsWith(pathOverWatermark.toString()) == false).count(), equalTo(shardsOnGoodPath)); + } + + private Map getShardCountByNodeId() { + final Map shardCountByNodeId = new HashMap<>(); + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + for (final RoutingNode node : clusterState.getRoutingNodes()) { + logger.info("----> node {} has {} shards", + node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); + shardCountByNodeId.put(node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); + } + return shardCountByNodeId; + } + + private MockInternalClusterInfoService getMockInternalClusterInfoService() { + return (MockInternalClusterInfoService) internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class); } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index dfe21ee429406..7ec046472e344 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -18,110 +18,80 @@ */ package org.elasticsearch.cluster; -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; -import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Arrays; -import java.util.Collections; -import java.util.concurrent.CountDownLatch; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; - -/** - * Fake ClusterInfoService class that allows updating the nodes stats disk - * usage with fake values - */ public class MockInternalClusterInfoService extends InternalClusterInfoService { /** This is a marker plugin used to trigger MockNode to use this mock info service. */ public static class TestPlugin extends Plugin {} - private final ClusterName clusterName; - private volatile NodeStats[] stats = new NodeStats[3]; + @Nullable // if no fakery should take place + public volatile Function shardSizeFunction; - /** Create a fake NodeStats for the given node and usage */ - public static NodeStats makeStats(String nodeName, DiskUsage usage) { - FsInfo.Path[] paths = new FsInfo.Path[1]; - FsInfo.Path path = new FsInfo.Path("/dev/null", null, - usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeBytes()); - paths[0] = path; - FsInfo fsInfo = new FsInfo(System.currentTimeMillis(), null, paths); - return new NodeStats( - new DiscoveryNode(nodeName, ESTestCase.buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), - System.currentTimeMillis(), - null, null, null, null, null, - fsInfo, - null, null, null, - null, null, null, null); - } + @Nullable // if no fakery should take place + public volatile BiFunction diskUsageFunction; public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { super(settings, clusterService, threadPool, client); - this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); - stats[0] = makeStats("node_t1", new DiskUsage("node_t1", "n1", "/dev/null", 100, 100)); - stats[1] = makeStats("node_t2", new DiskUsage("node_t2", "n2", "/dev/null", 100, 100)); - stats[2] = makeStats("node_t3", new DiskUsage("node_t3", "n3", "/dev/null", 100, 100)); - } - - public void setN1Usage(String nodeName, DiskUsage newUsage) { - stats[0] = makeStats(nodeName, newUsage); - } - - public void setN2Usage(String nodeName, DiskUsage newUsage) { - stats[1] = makeStats(nodeName, newUsage); - } - - public void setN3Usage(String nodeName, DiskUsage newUsage) { - stats[2] = makeStats(nodeName, newUsage); } @Override - public CountDownLatch updateNodeStats(final ActionListener listener) { - NodesStatsResponse response = new NodesStatsResponse(clusterName, Arrays.asList(stats), Collections.emptyList()); - listener.onResponse(response); - return new CountDownLatch(0); + public ClusterInfo getClusterInfo() { + final ClusterInfo clusterInfo = super.getClusterInfo(); + return new SizeFakingClusterInfo(clusterInfo); } @Override - public CountDownLatch updateIndicesStats(final ActionListener listener) { - // Not used, so noop - return new CountDownLatch(0); - } + List adjustNodesStats(List nodesStats) { + final BiFunction diskUsageFunction = this.diskUsageFunction; + if (diskUsageFunction == null) { + return nodesStats; + } - @Override - public ClusterInfo getClusterInfo() { - ClusterInfo clusterInfo = super.getClusterInfo(); - return new DevNullClusterInfo(clusterInfo.getNodeLeastAvailableDiskUsages(), - clusterInfo.getNodeMostAvailableDiskUsages(), clusterInfo.shardSizes); + return nodesStats.stream().map(nodeStats -> { + final DiscoveryNode discoveryNode = nodeStats.getNode(); + final FsInfo oldFsInfo = nodeStats.getFs(); + return new NodeStats(discoveryNode, nodeStats.getTimestamp(), nodeStats.getIndices(), nodeStats.getOs(), + nodeStats.getProcess(), nodeStats.getJvm(), nodeStats.getThreadPool(), new FsInfo(oldFsInfo.getTimestamp(), + oldFsInfo.getIoStats(), + StreamSupport.stream(oldFsInfo.spliterator(), false) + .map(fsInfoPath -> diskUsageFunction.apply(discoveryNode, fsInfoPath)) + .toArray(FsInfo.Path[]::new)), nodeStats.getTransport(), + nodeStats.getHttp(), nodeStats.getBreaker(), nodeStats.getScriptStats(), nodeStats.getDiscoveryStats(), + nodeStats.getIngestStats(), nodeStats.getAdaptiveSelectionStats()); + }).collect(Collectors.toList()); } - /** - * ClusterInfo that always points to DevNull. - */ - public static class DevNullClusterInfo extends ClusterInfo { - public DevNullClusterInfo(ImmutableOpenMap leastAvailableSpaceUsage, - ImmutableOpenMap mostAvailableSpaceUsage, ImmutableOpenMap shardSizes) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null); + class SizeFakingClusterInfo extends ClusterInfo { + SizeFakingClusterInfo(ClusterInfo delegate) { + super(delegate.getNodeLeastAvailableDiskUsages(), delegate.getNodeMostAvailableDiskUsages(), + delegate.shardSizes, delegate.routingToDataPath); } @Override - public String getDataPath(ShardRouting shardRouting) { - return "/dev/null"; + public Long getShardSize(ShardRouting shardRouting) { + final Function shardSizeFunction = MockInternalClusterInfoService.this.shardSizeFunction; + if (shardSizeFunction == null) { + return super.getShardSize(shardRouting); + } + + return shardSizeFunction.apply(shardRouting); } }