19
19
20
20
package org .elasticsearch .cluster .routing .allocation .decider ;
21
21
22
- import org .elasticsearch .action .admin .cluster .node .stats .NodesStatsResponse ;
23
- import org .elasticsearch .action .admin .cluster .state .ClusterStateResponse ;
24
22
import org .elasticsearch .cluster .ClusterInfo ;
25
23
import org .elasticsearch .cluster .ClusterInfoService ;
24
+ import org .elasticsearch .cluster .ClusterState ;
26
25
import org .elasticsearch .cluster .DiskUsage ;
27
26
import org .elasticsearch .cluster .MockInternalClusterInfoService ;
28
27
import org .elasticsearch .cluster .routing .RoutingNode ;
33
32
import org .elasticsearch .test .ESIntegTestCase ;
34
33
35
34
import java .util .ArrayList ;
36
- import java .util .Arrays ;
37
35
import java .util .Collection ;
36
+ import java .util .Collections ;
38
37
import java .util .HashMap ;
39
- import java .util .Iterator ;
40
38
import java .util .List ;
41
39
import java .util .Map ;
42
40
@@ -50,21 +48,15 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
50
48
@ Override
51
49
protected Collection <Class <? extends Plugin >> nodePlugins () {
52
50
// Use the mock internal cluster info service, which has fake-able disk usages
53
- return Arrays . asList (MockInternalClusterInfoService .TestPlugin .class );
51
+ return Collections . singletonList (MockInternalClusterInfoService .TestPlugin .class );
54
52
}
55
53
56
54
public void testRerouteOccursOnDiskPassingHighWatermark () throws Exception {
57
55
List <String > nodes = internalCluster ().startNodes (3 );
58
56
59
- // Wait for all 3 nodes to be up
60
- assertBusy (() -> {
61
- NodesStatsResponse resp = client ().admin ().cluster ().prepareNodesStats ().get ();
62
- assertThat (resp .getNodes ().size (), equalTo (3 ));
63
- });
64
-
65
57
// Start with all nodes at 50% usage
66
58
final MockInternalClusterInfoService cis = (MockInternalClusterInfoService )
67
- internalCluster ().getInstance (ClusterInfoService .class , internalCluster ().getMasterName ());
59
+ internalCluster ().getInstance (ClusterInfoService .class , internalCluster ().getMasterName ());
68
60
cis .setUpdateFrequency (TimeValue .timeValueMillis (200 ));
69
61
cis .onMaster ();
70
62
cis .setN1Usage (nodes .get (0 ), new DiskUsage (nodes .get (0 ), "n1" , "/dev/null" , 100 , 50 ));
@@ -73,52 +65,48 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
73
65
74
66
final boolean watermarkBytes = randomBoolean (); // we have to consistently use bytes or percentage for the disk watermark settings
75
67
client ().admin ().cluster ().prepareUpdateSettings ().setTransientSettings (Settings .builder ()
76
- .put (DiskThresholdSettings .CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING .getKey (), watermarkBytes ? "20b" : "80%" )
77
- .put (DiskThresholdSettings .CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING .getKey (), watermarkBytes ? "10b" : "90%" )
78
- .put (
79
- DiskThresholdSettings .CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING .getKey (),
80
- watermarkBytes ? "0b" : "100%" )
81
- .put (DiskThresholdSettings .CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING .getKey (), "1ms" )).get ();
68
+ .put (DiskThresholdSettings .CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING .getKey (), watermarkBytes ? "20b" : "80%" )
69
+ .put (DiskThresholdSettings .CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING .getKey (), watermarkBytes ? "10b" : "90%" )
70
+ .put (DiskThresholdSettings .CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING .getKey (),
71
+ watermarkBytes ? "0b" : "100%" )
72
+ .put (DiskThresholdSettings .CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING .getKey (), "1ms" )).get ();
82
73
// Create an index with 10 shards so we can check allocation for it
83
74
prepareCreate ("test" ).setSettings (Settings .builder ()
84
- .put ("number_of_shards" , 10 )
85
- .put ("number_of_replicas" , 0 )
86
- .put ("index.routing.allocation.exclude._name" , "" )).get ();
75
+ .put ("number_of_shards" , 10 )
76
+ .put ("number_of_replicas" , 0 )).get ();
87
77
ensureGreen ("test" );
88
78
89
79
// Block until the "fake" cluster info is retrieved at least once
90
80
assertBusy (() -> {
91
- ClusterInfo info = cis .getClusterInfo ();
81
+ final ClusterInfo info = cis .getClusterInfo ();
92
82
logger .info ("--> got: {} nodes" , info .getNodeLeastAvailableDiskUsages ().size ());
93
83
assertThat (info .getNodeLeastAvailableDiskUsages ().size (), greaterThan (0 ));
94
84
});
95
85
96
86
final List <String > realNodeNames = new ArrayList <>();
97
- ClusterStateResponse resp = client (). admin (). cluster (). prepareState (). get ();
98
- Iterator < RoutingNode > iter = resp . getState ().getRoutingNodes ().iterator ();
99
- while ( iter . hasNext ()) {
100
- RoutingNode node = iter . next ( );
101
- realNodeNames . add ( node . nodeId ());
102
- logger . info ( "--> node {} has {} shards" ,
103
- node . nodeId (), resp . getState (). getRoutingNodes (). node ( node . nodeId ()). numberOfOwningShards ());
87
+ {
88
+ final ClusterState clusterState = client (). admin ().cluster ().prepareState (). get (). getState ();
89
+ for ( final RoutingNode node : clusterState . getRoutingNodes ()) {
90
+ realNodeNames . add ( node . nodeId () );
91
+ logger . info ( "--> node {} has {} shards" ,
92
+ node . nodeId (), clusterState . getRoutingNodes (). node ( node . nodeId ()). numberOfOwningShards ());
93
+ }
104
94
}
105
95
106
96
// Update the disk usages so one node has now passed the high watermark
107
97
cis .setN1Usage (realNodeNames .get (0 ), new DiskUsage (nodes .get (0 ), "n1" , "_na_" , 100 , 50 ));
108
98
cis .setN2Usage (realNodeNames .get (1 ), new DiskUsage (nodes .get (1 ), "n2" , "_na_" , 100 , 50 ));
109
99
cis .setN3Usage (realNodeNames .get (2 ), new DiskUsage (nodes .get (2 ), "n3" , "_na_" , 100 , 0 )); // nothing free on node3
110
100
111
- // Retrieve the count of shards on each node
112
- final Map <String , Integer > nodesToShardCount = new HashMap <>();
113
-
114
101
assertBusy (() -> {
115
- ClusterStateResponse resp12 = client ().admin ().cluster ().prepareState ().get ();
116
- Iterator <RoutingNode > iter12 = resp12 .getState ().getRoutingNodes ().iterator ();
117
- while (iter12 .hasNext ()) {
118
- RoutingNode node = iter12 .next ();
102
+ final ClusterState clusterState = client ().admin ().cluster ().prepareState ().get ().getState ();
103
+ logger .info ("--> {}" , clusterState .routingTable ());
104
+
105
+ final Map <String , Integer > nodesToShardCount = new HashMap <>();
106
+ for (final RoutingNode node : clusterState .getRoutingNodes ()) {
119
107
logger .info ("--> node {} has {} shards" ,
120
- node .nodeId (), resp12 . getState () .getRoutingNodes ().node (node .nodeId ()).numberOfOwningShards ());
121
- nodesToShardCount .put (node .nodeId (), resp12 . getState () .getRoutingNodes ().node (node .nodeId ()).numberOfOwningShards ());
108
+ node .nodeId (), clusterState .getRoutingNodes ().node (node .nodeId ()).numberOfOwningShards ());
109
+ nodesToShardCount .put (node .nodeId (), clusterState .getRoutingNodes ().node (node .nodeId ()).numberOfOwningShards ());
122
110
}
123
111
assertThat ("node1 has 5 shards" , nodesToShardCount .get (realNodeNames .get (0 )), equalTo (5 ));
124
112
assertThat ("node2 has 5 shards" , nodesToShardCount .get (realNodeNames .get (1 )), equalTo (5 ));
@@ -130,17 +118,13 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
130
118
cis .setN2Usage (realNodeNames .get (1 ), new DiskUsage (nodes .get (1 ), "n2" , "_na_" , 100 , 50 ));
131
119
cis .setN3Usage (realNodeNames .get (2 ), new DiskUsage (nodes .get (2 ), "n3" , "_na_" , 100 , 50 )); // node3 has free space now
132
120
133
- // Retrieve the count of shards on each node
134
- nodesToShardCount .clear ();
135
-
136
121
assertBusy (() -> {
137
- ClusterStateResponse resp1 = client ().admin ().cluster ().prepareState ().get ();
138
- Iterator <RoutingNode > iter1 = resp1 .getState ().getRoutingNodes ().iterator ();
139
- while (iter1 .hasNext ()) {
140
- RoutingNode node = iter1 .next ();
122
+ final Map <String , Integer > nodesToShardCount = new HashMap <>();
123
+ final ClusterState clusterState = client ().admin ().cluster ().prepareState ().get ().getState ();
124
+ for (final RoutingNode node : clusterState .getRoutingNodes ()) {
141
125
logger .info ("--> node {} has {} shards" ,
142
- node .nodeId (), resp1 . getState () .getRoutingNodes ().node (node .nodeId ()).numberOfOwningShards ());
143
- nodesToShardCount .put (node .nodeId (), resp1 . getState () .getRoutingNodes ().node (node .nodeId ()).numberOfOwningShards ());
126
+ node .nodeId (), clusterState .getRoutingNodes ().node (node .nodeId ()).numberOfOwningShards ());
127
+ nodesToShardCount .put (node .nodeId (), clusterState .getRoutingNodes ().node (node .nodeId ()).numberOfOwningShards ());
144
128
}
145
129
assertThat ("node1 has at least 3 shards" , nodesToShardCount .get (realNodeNames .get (0 )), greaterThanOrEqualTo (3 ));
146
130
assertThat ("node2 has at least 3 shards" , nodesToShardCount .get (realNodeNames .get (1 )), greaterThanOrEqualTo (3 ));
0 commit comments