22
22
import org .elasticsearch .action .admin .cluster .configuration .AddVotingConfigExclusionsRequest ;
23
23
import org .elasticsearch .action .admin .cluster .configuration .ClearVotingConfigExclusionsAction ;
24
24
import org .elasticsearch .action .admin .cluster .configuration .ClearVotingConfigExclusionsRequest ;
25
+ import org .elasticsearch .action .admin .cluster .health .ClusterHealthRequest ;
25
26
import org .elasticsearch .action .admin .cluster .health .ClusterHealthRequestBuilder ;
26
27
import org .elasticsearch .action .admin .cluster .health .ClusterHealthResponse ;
27
28
import org .elasticsearch .action .admin .cluster .state .ClusterStateRequest ;
29
+ import org .elasticsearch .action .admin .indices .create .CreateIndexRequest ;
28
30
import org .elasticsearch .client .Client ;
29
31
import org .elasticsearch .client .Requests ;
30
32
import org .elasticsearch .cluster .metadata .IndexMetaData ;
34
36
import org .elasticsearch .common .Priority ;
35
37
import org .elasticsearch .common .settings .Settings ;
36
38
import org .elasticsearch .common .unit .TimeValue ;
39
+ import org .elasticsearch .discovery .Discovery ;
37
40
import org .elasticsearch .discovery .zen .ElectMasterService ;
38
41
import org .elasticsearch .env .NodeEnvironment ;
39
42
import org .elasticsearch .gateway .MetaStateService ;
43
+ import org .elasticsearch .plugins .Plugin ;
40
44
import org .elasticsearch .test .ESIntegTestCase ;
41
45
import org .elasticsearch .test .InternalTestCluster .RestartCallback ;
42
46
import org .elasticsearch .test .discovery .TestZenDiscovery ;
47
+ import org .elasticsearch .test .transport .MockTransportService ;
48
+ import org .elasticsearch .transport .TransportService ;
43
49
50
+ import java .util .Collection ;
51
+ import java .util .Collections ;
44
52
import java .util .List ;
45
53
import java .util .stream .Collectors ;
46
54
import java .util .stream .IntStream ;
47
55
import java .util .stream .StreamSupport ;
48
56
49
57
import static org .elasticsearch .cluster .coordination .ClusterBootstrapService .INITIAL_MASTER_NODES_SETTING ;
50
58
import static org .elasticsearch .cluster .coordination .Coordinator .ZEN1_BWC_TERM ;
59
+ import static org .elasticsearch .cluster .coordination .FollowersChecker .FOLLOWER_CHECK_ACTION_NAME ;
60
+ import static org .elasticsearch .cluster .coordination .JoinHelper .START_JOIN_ACTION_NAME ;
61
+ import static org .elasticsearch .cluster .coordination .PublicationTransportHandler .PUBLISH_STATE_ACTION_NAME ;
51
62
import static org .elasticsearch .cluster .routing .allocation .decider .EnableAllocationDecider .CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING ;
52
63
import static org .elasticsearch .cluster .routing .allocation .decider .FilterAllocationDecider .CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING ;
53
64
import static org .elasticsearch .node .Node .NODE_NAME_SETTING ;
54
65
import static org .elasticsearch .test .InternalTestCluster .REMOVED_MINIMUM_MASTER_NODES ;
66
+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
55
67
import static org .hamcrest .Matchers .equalTo ;
56
68
import static org .hamcrest .Matchers .is ;
57
69
@@ -67,6 +79,10 @@ public class Zen1IT extends ESIntegTestCase {
67
79
.put (TestZenDiscovery .USE_ZEN2 .getKey (), true )
68
80
.build ();
69
81
82
+ protected Collection <Class <? extends Plugin >> nodePlugins () {
83
+ return Collections .singletonList (MockTransportService .TestPlugin .class );
84
+ }
85
+
70
86
public void testZen2NodesJoiningZen1Cluster () {
71
87
internalCluster ().startNodes (randomIntBetween (1 , 3 ), ZEN1_SETTINGS );
72
88
internalCluster ().startNodes (randomIntBetween (1 , 3 ), ZEN2_SETTINGS );
@@ -79,6 +95,56 @@ public void testZen1NodesJoiningZen2Cluster() {
79
95
createIndex ("test" );
80
96
}
81
97
98
+ public void testMixedClusterDisruption () throws Exception {
99
+ final List <String > nodes = internalCluster ().startNodes (IntStream .range (0 , 5 )
100
+ .mapToObj (i -> i < 2 ? ZEN1_SETTINGS : ZEN2_SETTINGS ).toArray (Settings []::new ));
101
+
102
+ final List <MockTransportService > transportServices = nodes .stream ()
103
+ .map (n -> (MockTransportService ) internalCluster ().getInstance (TransportService .class , n )).collect (Collectors .toList ());
104
+
105
+ logger .info ("--> disrupting communications" );
106
+
107
+ // The idea here is to make some of the Zen2 nodes believe the Zen1 nodes have gone away by introducing a network partition, so that
108
+ // they bootstrap themselves, but keep the Zen1 side of the cluster alive.
109
+
110
+ // Set up a bridged network partition with the Zen1 nodes {0,1} on one side, Zen2 nodes {3,4} on the other, and node {2} in both
111
+ transportServices .get (0 ).addFailToSendNoConnectRule (transportServices .get (3 ));
112
+ transportServices .get (0 ).addFailToSendNoConnectRule (transportServices .get (4 ));
113
+ transportServices .get (1 ).addFailToSendNoConnectRule (transportServices .get (3 ));
114
+ transportServices .get (1 ).addFailToSendNoConnectRule (transportServices .get (4 ));
115
+ transportServices .get (3 ).addFailToSendNoConnectRule (transportServices .get (0 ));
116
+ transportServices .get (3 ).addFailToSendNoConnectRule (transportServices .get (1 ));
117
+ transportServices .get (4 ).addFailToSendNoConnectRule (transportServices .get (0 ));
118
+ transportServices .get (4 ).addFailToSendNoConnectRule (transportServices .get (1 ));
119
+
120
+ // Nodes 3 and 4 will bootstrap, but we want to keep node 2 as part of the Zen1 cluster, so prevent any messages that might switch
121
+ // its allegiance
122
+ transportServices .get (3 ).addFailToSendNoConnectRule (transportServices .get (2 ),
123
+ PUBLISH_STATE_ACTION_NAME , FOLLOWER_CHECK_ACTION_NAME , START_JOIN_ACTION_NAME );
124
+ transportServices .get (4 ).addFailToSendNoConnectRule (transportServices .get (2 ),
125
+ PUBLISH_STATE_ACTION_NAME , FOLLOWER_CHECK_ACTION_NAME , START_JOIN_ACTION_NAME );
126
+
127
+ logger .info ("--> waiting for disconnected nodes to be removed" );
128
+ ensureStableCluster (3 , nodes .get (0 ));
129
+
130
+ logger .info ("--> creating index on Zen1 side" );
131
+ assertAcked (client (nodes .get (0 )).admin ().indices ().create (new CreateIndexRequest ("test" )).get ());
132
+ assertFalse (client (nodes .get (0 )).admin ().cluster ().health (new ClusterHealthRequest ("test" )
133
+ .waitForGreenStatus ()).get ().isTimedOut ());
134
+
135
+ logger .info ("--> waiting for disconnected nodes to bootstrap themselves" );
136
+ assertBusy (() -> assertTrue (IntStream .range (3 , 5 )
137
+ .mapToObj (n -> (Coordinator ) internalCluster ().getInstance (Discovery .class , nodes .get (n )))
138
+ .anyMatch (Coordinator ::isInitialConfigurationSet )));
139
+
140
+ logger .info ("--> clearing disruption and waiting for cluster to reform" );
141
+ transportServices .forEach (MockTransportService ::clearAllRules );
142
+
143
+ ensureStableCluster (5 , nodes .get (0 ));
144
+ assertFalse (client (nodes .get (0 )).admin ().cluster ().health (new ClusterHealthRequest ("test" )
145
+ .waitForGreenStatus ()).get ().isTimedOut ());
146
+ }
147
+
82
148
public void testMixedClusterFormation () throws Exception {
83
149
final int zen1NodeCount = randomIntBetween (1 , 3 );
84
150
final int zen2NodeCount = randomIntBetween (zen1NodeCount == 1 ? 2 : 1 , 3 );
0 commit comments