@@ -119,6 +119,8 @@ public void testCloseWhileRelocatingShards() throws Exception {
119
119
120
120
final String targetNode = internalCluster ().startDataOnlyNode ();
121
121
ensureClusterSizeConsistency (); // wait for the master to finish processing join.
122
+ final MockTransportService targetTransportService =
123
+ (MockTransportService ) internalCluster ().getInstance (TransportService .class , targetNode );
122
124
123
125
final Set <String > acknowledgedCloses = ConcurrentCollections .newConcurrentSet ();
124
126
try {
@@ -146,8 +148,7 @@ public void testCloseWhileRelocatingShards() throws Exception {
146
148
}
147
149
148
150
final DiscoveryNode sourceNode = clusterService .state ().nodes ().resolveNode (primary .currentNodeId ());
149
- ((MockTransportService ) internalCluster ().getInstance (TransportService .class , targetNode ))
150
- .addSendBehavior (internalCluster ().getInstance (TransportService .class , sourceNode .getName ()),
151
+ targetTransportService .addSendBehavior (internalCluster ().getInstance (TransportService .class , sourceNode .getName ()),
151
152
(connection , requestId , action , request , options ) -> {
152
153
if (PeerRecoverySourceService .Actions .START_RECOVERY .equals (action )) {
153
154
logger .debug ("blocking recovery of shard {}" , ((StartRecoveryRequest ) request ).shardId ());
@@ -210,28 +211,30 @@ public void testCloseWhileRelocatingShards() throws Exception {
210
211
}
211
212
}
212
213
}
213
- } finally {
214
- assertAcked (client ().admin ().cluster ().prepareUpdateSettings ()
215
- .setTransientSettings (Settings .builder ()
216
- .putNull (EnableAllocationDecider .CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING .getKey ())));
217
- }
218
214
219
- for (String index : indices ) {
220
- if (acknowledgedCloses .contains (index )) {
221
- assertIndexIsClosed (index );
222
- } else {
223
- assertIndexIsOpened (index );
215
+ for (String index : indices ) {
216
+ if (acknowledgedCloses .contains (index )) {
217
+ assertIndexIsClosed (index );
218
+ } else {
219
+ assertIndexIsOpened (index );
220
+ }
224
221
}
225
- }
226
222
227
- assertThat ("Consider that the test failed if no indices were successfully closed" , acknowledgedCloses .size (), greaterThan (0 ));
228
- assertAcked (client ().admin ().indices ().prepareOpen ("index-*" ));
229
- ensureGreen (indices );
223
+ targetTransportService .clearAllRules ();
224
+
225
+ assertThat ("Consider that the test failed if no indices were successfully closed" , acknowledgedCloses .size (), greaterThan (0 ));
226
+ assertAcked (client ().admin ().indices ().prepareOpen ("index-*" ));
227
+ ensureGreen (indices );
230
228
231
- for (String index : acknowledgedCloses ) {
232
- long docsCount = client ().prepareSearch (index ).setSize (0 ).get ().getHits ().getTotalHits ().value ;
233
- assertEquals ("Expected " + docsPerIndex .get (index ) + " docs in index " + index + " but got " + docsCount
234
- + " (close acknowledged=" + acknowledgedCloses .contains (index ) + ")" , (long ) docsPerIndex .get (index ), docsCount );
229
+ for (String index : acknowledgedCloses ) {
230
+ long docsCount = client ().prepareSearch (index ).setSize (0 ).get ().getHits ().getTotalHits ().value ;
231
+ assertEquals ("Expected " + docsPerIndex .get (index ) + " docs in index " + index + " but got " + docsCount
232
+ + " (close acknowledged=" + acknowledgedCloses .contains (index ) + ")" , (long ) docsPerIndex .get (index ), docsCount );
233
+ }
234
+ } finally {
235
+ assertAcked (client ().admin ().cluster ().prepareUpdateSettings ()
236
+ .setTransientSettings (Settings .builder ()
237
+ .putNull (EnableAllocationDecider .CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING .getKey ())));
235
238
}
236
239
}
237
240
}
0 commit comments