18
18
*/
19
19
package org .elasticsearch .indices .state ;
20
20
21
+ import org .apache .logging .log4j .message .ParameterizedMessage ;
21
22
import org .elasticsearch .action .admin .cluster .reroute .ClusterRerouteRequest ;
22
23
import org .elasticsearch .action .support .master .AcknowledgedResponse ;
24
+ import org .elasticsearch .cluster .ClusterState ;
23
25
import org .elasticsearch .cluster .node .DiscoveryNode ;
24
26
import org .elasticsearch .cluster .routing .IndexRoutingTable ;
25
27
import org .elasticsearch .cluster .routing .ShardRouting ;
32
34
import org .elasticsearch .cluster .service .ClusterService ;
33
35
import org .elasticsearch .common .settings .Settings ;
34
36
import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
37
+ import org .elasticsearch .index .shard .ShardId ;
35
38
import org .elasticsearch .indices .recovery .PeerRecoverySourceService ;
36
39
import org .elasticsearch .indices .recovery .StartRecoveryRequest ;
37
40
import org .elasticsearch .plugins .Plugin ;
38
41
import org .elasticsearch .test .BackgroundIndexer ;
39
42
import org .elasticsearch .test .ESIntegTestCase ;
40
- import org .elasticsearch .test .junit .annotations .TestLogging ;
41
43
import org .elasticsearch .test .transport .MockTransportService ;
44
+ import org .elasticsearch .test .transport .StubbableTransport ;
42
45
import org .elasticsearch .transport .TransportService ;
43
46
44
47
import java .util .ArrayList ;
57
60
import static org .elasticsearch .indices .state .CloseIndexIT .assertIndexIsOpened ;
58
61
import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
59
62
import static org .hamcrest .Matchers .greaterThan ;
63
+ import static org .hamcrest .Matchers .hasSize ;
60
64
61
65
@ ESIntegTestCase .ClusterScope (minNumDataNodes = 2 )
62
66
public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
@@ -68,9 +72,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
68
72
69
73
@ Override
70
74
protected Settings nodeSettings (int nodeOrdinal ) {
75
+ final int maxRecoveries = Integer .MAX_VALUE ;
71
76
return Settings .builder ()
72
77
.put (super .nodeSettings (nodeOrdinal ))
73
- .put (ThrottlingAllocationDecider .CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING .getKey (), Integer .MAX_VALUE )
78
+ .put (ThrottlingAllocationDecider .CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING .getKey (), maxRecoveries )
79
+ .put (ThrottlingAllocationDecider .CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING .getKey (), maxRecoveries )
74
80
.put (ConcurrentRebalanceAllocationDecider .CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING .getKey (), -1 )
75
81
.build ();
76
82
}
@@ -80,7 +86,6 @@ protected int maximumNumberOfShards() {
80
86
return 3 ;
81
87
}
82
88
83
- @ TestLogging ("org.elasticsearch.cluster.metadata.MetaDataIndexStateService:DEBUG,org.elasticsearch.action.admin.indices.close:DEBUG" )
84
89
public void testCloseWhileRelocatingShards () throws Exception {
85
90
final String [] indices = new String [randomIntBetween (3 , 5 )];
86
91
final Map <String , Long > docsPerIndex = new HashMap <>();
@@ -119,21 +124,19 @@ public void testCloseWhileRelocatingShards() throws Exception {
119
124
120
125
final String targetNode = internalCluster ().startDataOnlyNode ();
121
126
ensureClusterSizeConsistency (); // wait for the master to finish processing join.
122
- final MockTransportService targetTransportService =
123
- (MockTransportService ) internalCluster ().getInstance (TransportService .class , targetNode );
124
127
125
- final Set <String > acknowledgedCloses = ConcurrentCollections .newConcurrentSet ();
126
128
try {
127
129
final ClusterService clusterService = internalCluster ().getInstance (ClusterService .class , internalCluster ().getMasterName ());
130
+ final ClusterState state = clusterService .state ();
128
131
final CountDownLatch latch = new CountDownLatch (indices .length );
129
- final CountDownLatch release = new CountDownLatch (1 );
132
+ final CountDownLatch release = new CountDownLatch (indices . length );
130
133
131
134
// relocate one shard for every index to be closed
132
135
final AllocationCommands commands = new AllocationCommands ();
133
136
for (final String index : indices ) {
134
137
final NumShards numShards = getNumShards (index );
135
138
final int shardId = numShards .numPrimaries == 1 ? 0 : randomIntBetween (0 , numShards .numPrimaries - 1 );
136
- final IndexRoutingTable indexRoutingTable = clusterService . state () .routingTable ().index (index );
139
+ final IndexRoutingTable indexRoutingTable = state .routingTable ().index (index );
137
140
138
141
final ShardRouting primary = indexRoutingTable .shard (shardId ).primaryShard ();
139
142
assertTrue (primary .started ());
@@ -146,24 +149,49 @@ public void testCloseWhileRelocatingShards() throws Exception {
146
149
currentNodeId = replica .currentNodeId ();
147
150
}
148
151
}
152
+ commands .add (new MoveAllocationCommand (index , shardId , state .nodes ().resolveNode (currentNodeId ).getName (), targetNode ));
153
+ }
154
+
155
+ // Build the list of shards for which recoveries will be blocked
156
+ final Set <ShardId > blockedShards = commands .commands ().stream ()
157
+ .map (c -> (MoveAllocationCommand ) c )
158
+ .map (c -> new ShardId (clusterService .state ().metaData ().index (c .index ()).getIndex (), c .shardId ()))
159
+ .collect (Collectors .toSet ());
160
+ assertThat (blockedShards , hasSize (indices .length ));
161
+
162
+ final Set <String > acknowledgedCloses = ConcurrentCollections .newConcurrentSet ();
163
+ final Set <String > interruptedRecoveries = ConcurrentCollections .newConcurrentSet ();
149
164
150
- final DiscoveryNode sourceNode = clusterService .state ().nodes ().resolveNode (primary .currentNodeId ());
151
- targetTransportService .addSendBehavior (internalCluster ().getInstance (TransportService .class , sourceNode .getName ()),
152
- (connection , requestId , action , request , options ) -> {
153
- if (PeerRecoverySourceService .Actions .START_RECOVERY .equals (action )) {
154
- logger .debug ("blocking recovery of shard {}" , ((StartRecoveryRequest ) request ).shardId ());
155
- latch .countDown ();
156
- try {
157
- release .await ();
158
- logger .debug ("releasing recovery of shard {}" , ((StartRecoveryRequest ) request ).shardId ());
159
- } catch (InterruptedException e ) {
160
- throw new AssertionError (e );
161
- }
162
- }
163
- connection .sendRequest (requestId , action , request , options );
165
+ // Create a SendRequestBehavior that will block outgoing start recovery request
166
+ final StubbableTransport .SendRequestBehavior sendBehavior = (connection , requestId , action , request , options ) -> {
167
+ if (PeerRecoverySourceService .Actions .START_RECOVERY .equals (action )) {
168
+ final StartRecoveryRequest startRecoveryRequest = ((StartRecoveryRequest ) request );
169
+ if (blockedShards .contains (startRecoveryRequest .shardId ())) {
170
+ logger .debug ("blocking recovery of shard {}" , startRecoveryRequest .shardId ());
171
+ latch .countDown ();
172
+ try {
173
+ release .await ();
174
+ logger .debug ("releasing recovery of shard {}" , startRecoveryRequest .shardId ());
175
+ } catch (final InterruptedException e ) {
176
+ logger .warn (() -> new ParameterizedMessage ("exception when releasing recovery of shard {}" ,
177
+ startRecoveryRequest .shardId ()), e );
178
+ interruptedRecoveries .add (startRecoveryRequest .shardId ().getIndexName ());
179
+ Thread .currentThread ().interrupt ();
180
+ return ;
164
181
}
165
- );
166
- commands .add (new MoveAllocationCommand (index , shardId , currentNodeId , targetNode ));
182
+ }
183
+ }
184
+ connection .sendRequest (requestId , action , request , options );
185
+ };
186
+
187
+ final MockTransportService targetTransportService =
188
+ (MockTransportService ) internalCluster ().getInstance (TransportService .class , targetNode );
189
+
190
+ for (DiscoveryNode node : state .getNodes ()) {
191
+ if (node .isDataNode () && node .getName ().equals (targetNode ) == false ) {
192
+ final TransportService sourceTransportService = internalCluster ().getInstance (TransportService .class , node .getName ());
193
+ targetTransportService .addSendBehavior (sourceTransportService , sendBehavior );
194
+ }
167
195
}
168
196
169
197
assertAcked (client ().admin ().cluster ().reroute (new ClusterRerouteRequest ().commands (commands )).get ());
@@ -222,12 +250,15 @@ public void testCloseWhileRelocatingShards() throws Exception {
222
250
223
251
targetTransportService .clearAllRules ();
224
252
253
+ // If a shard recovery has been interrupted, we expect its index to be closed
254
+ interruptedRecoveries .forEach (CloseIndexIT ::assertIndexIsClosed );
255
+
225
256
assertThat ("Consider that the test failed if no indices were successfully closed" , acknowledgedCloses .size (), greaterThan (0 ));
226
257
assertAcked (client ().admin ().indices ().prepareOpen ("index-*" ));
227
258
ensureGreen (indices );
228
259
229
260
for (String index : acknowledgedCloses ) {
230
- long docsCount = client ().prepareSearch (index ).setSize (0 ).get ().getHits ().getTotalHits ().value ;
261
+ long docsCount = client ().prepareSearch (index ).setSize (0 ).setTrackTotalHits ( true ). get ().getHits ().getTotalHits ().value ;
231
262
assertEquals ("Expected " + docsPerIndex .get (index ) + " docs in index " + index + " but got " + docsCount
232
263
+ " (close acknowledged=" + acknowledgedCloses .contains (index ) + ")" , (long ) docsPerIndex .get (index ), docsCount );
233
264
}
0 commit comments