|
56 | 56 | import java.util.HashMap;
|
57 | 57 | import java.util.HashSet;
|
58 | 58 | import java.util.Map;
|
| 59 | +import java.util.concurrent.atomic.AtomicReference; |
59 | 60 |
|
60 | 61 | import static java.util.Collections.emptyMap;
|
61 | 62 | import static java.util.Collections.singleton;
|
62 | 63 | import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
63 | 64 | import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
64 | 65 | import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
|
65 | 66 | import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
|
| 67 | +import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING; |
66 | 68 | import static org.hamcrest.Matchers.containsString;
|
67 | 69 | import static org.hamcrest.Matchers.equalTo;
|
68 | 70 | import static org.hamcrest.Matchers.nullValue;
|
@@ -653,18 +655,19 @@ public void testShardRelocationsTakenIntoAccount() {
|
653 | 655 | final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
|
654 | 656 |
|
655 | 657 | DiskThresholdDecider decider = makeDecider(diskSettings);
|
| 658 | + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); |
656 | 659 | AllocationDeciders deciders = new AllocationDeciders(
|
657 |
| - new HashSet<>(Arrays.asList(new SameShardAllocationDecider( |
658 |
| - Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) |
659 |
| - ), decider))); |
| 660 | + new HashSet<>(Arrays.asList( |
| 661 | + new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), |
| 662 | + new EnableAllocationDecider( |
| 663 | + Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none").build(), clusterSettings), |
| 664 | + decider))); |
660 | 665 |
|
661 |
| - ClusterInfoService cis = () -> { |
662 |
| - logger.info("--> calling fake getClusterInfo"); |
663 |
| - return clusterInfo; |
664 |
| - }; |
| 666 | + final AtomicReference<ClusterInfo> clusterInfoReference = new AtomicReference<>(clusterInfo); |
| 667 | + final ClusterInfoService cis = clusterInfoReference::get; |
665 | 668 |
|
666 | 669 | AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
|
667 |
| - new BalancedShardsAllocator(Settings.EMPTY), cis); |
| 670 | + new BalancedShardsAllocator(Settings.EMPTY), cis); |
668 | 671 |
|
669 | 672 | MetaData metaData = MetaData.builder()
|
670 | 673 | .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
|
@@ -702,30 +705,66 @@ Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLU
|
702 | 705 | .add(newNode("node3"))
|
703 | 706 | ).build();
|
704 | 707 |
|
705 |
| - AllocationCommand relocate1 = new MoveAllocationCommand("test", 0, "node2", "node3"); |
706 |
| - AllocationCommands cmds = new AllocationCommands(relocate1); |
| 708 | + { |
| 709 | + AllocationCommand moveAllocationCommand = new MoveAllocationCommand("test", 0, "node2", "node3"); |
| 710 | + AllocationCommands cmds = new AllocationCommands(moveAllocationCommand); |
707 | 711 |
|
708 |
| - clusterState = strategy.reroute(clusterState, cmds, false, false).getClusterState(); |
709 |
| - logShardStates(clusterState); |
| 712 | + clusterState = strategy.reroute(clusterState, cmds, false, false).getClusterState(); |
| 713 | + logShardStates(clusterState); |
| 714 | + } |
| 715 | + |
| 716 | + final ImmutableOpenMap.Builder<String, DiskUsage> overfullUsagesBuilder = ImmutableOpenMap.builder(); |
| 717 | + overfullUsagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 40)); // 60% used |
| 718 | + overfullUsagesBuilder.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 40)); // 60% used |
| 719 | + overfullUsagesBuilder.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 0)); // 100% used |
| 720 | + final ImmutableOpenMap<String, DiskUsage> overfullUsages = overfullUsagesBuilder.build(); |
| 721 | + |
| 722 | + final ImmutableOpenMap.Builder<String, Long> largerShardSizesBuilder = ImmutableOpenMap.builder(); |
| 723 | + largerShardSizesBuilder.put("[test][0][p]", 14L); |
| 724 | + largerShardSizesBuilder.put("[test][0][r]", 14L); |
| 725 | + largerShardSizesBuilder.put("[test2][0][p]", 2L); |
| 726 | + largerShardSizesBuilder.put("[test2][0][r]", 2L); |
| 727 | + final ImmutableOpenMap<String, Long> largerShardSizes = largerShardSizesBuilder.build(); |
710 | 728 |
|
711 |
| - AllocationCommand relocate2 = new MoveAllocationCommand("test2", 0, "node2", "node3"); |
712 |
| - cmds = new AllocationCommands(relocate2); |
713 |
| - |
714 |
| - try { |
715 |
| - // The shard for the "test" index is already being relocated to |
716 |
| - // node3, which will put it over the low watermark when it |
717 |
| - // completes, with shard relocations taken into account this should |
718 |
| - // throw an exception about not being able to complete |
719 |
| - strategy.reroute(clusterState, cmds, false, false); |
720 |
| - fail("should not have been able to reroute the shard"); |
721 |
| - } catch (IllegalArgumentException e) { |
722 |
| - assertThat("can't be allocated because there isn't enough room: " + e.getMessage(), |
723 |
| - e.getMessage(), |
724 |
| - containsString("the node is above the low watermark cluster setting " + |
725 |
| - "[cluster.routing.allocation.disk.watermark.low=0.7], using more disk space than the maximum " + |
726 |
| - "allowed [70.0%], actual free: [26.0%]")); |
| 729 | + final ClusterInfo overfullClusterInfo = new DevNullClusterInfo(overfullUsages, overfullUsages, largerShardSizes); |
| 730 | + |
| 731 | + { |
| 732 | + AllocationCommand moveAllocationCommand = new MoveAllocationCommand("test2", 0, "node2", "node3"); |
| 733 | + AllocationCommands cmds = new AllocationCommands(moveAllocationCommand); |
| 734 | + |
| 735 | + final ClusterState clusterStateThatRejectsCommands = clusterState; |
| 736 | + |
| 737 | + assertThat(expectThrows(IllegalArgumentException.class, |
| 738 | + () -> strategy.reroute(clusterStateThatRejectsCommands, cmds, false, false)).getMessage(), |
| 739 | + containsString("the node is above the low watermark cluster setting " + |
| 740 | + "[cluster.routing.allocation.disk.watermark.low=0.7], using more disk space than the maximum " + |
| 741 | + "allowed [70.0%], actual free: [26.0%]")); |
| 742 | + |
| 743 | + clusterInfoReference.set(overfullClusterInfo); |
| 744 | + |
| 745 | + assertThat(expectThrows(IllegalArgumentException.class, |
| 746 | + () -> strategy.reroute(clusterStateThatRejectsCommands, cmds, false, false)).getMessage(), |
| 747 | + containsString("the node has fewer free bytes remaining than the total size of all incoming shards")); |
| 748 | + |
| 749 | + clusterInfoReference.set(clusterInfo); |
727 | 750 | }
|
728 | 751 |
|
| 752 | + { |
| 753 | + AllocationCommand moveAllocationCommand = new MoveAllocationCommand("test2", 0, "node2", "node3"); |
| 754 | + AllocationCommands cmds = new AllocationCommands(moveAllocationCommand); |
| 755 | + |
| 756 | + logger.info("--> before starting: {}", clusterState); |
| 757 | + clusterState = startInitializingShardsAndReroute(strategy, clusterState); |
| 758 | + logger.info("--> after starting: {}", clusterState); |
| 759 | + clusterState = strategy.reroute(clusterState, cmds, false, false).getClusterState(); |
| 760 | + logger.info("--> after running another command: {}", clusterState); |
| 761 | + logShardStates(clusterState); |
| 762 | + |
| 763 | + clusterInfoReference.set(overfullClusterInfo); |
| 764 | + |
| 765 | + clusterState = strategy.reroute(clusterState, "foo"); |
| 766 | + logger.info("--> after another reroute: {}", clusterState); |
| 767 | + } |
729 | 768 | }
|
730 | 769 |
|
731 | 770 | public void testCanRemainWithShardRelocatingAway() {
|
|
0 commit comments