25
25
import org .elasticsearch .cluster .ClusterStateTaskConfig ;
26
26
import org .elasticsearch .cluster .ClusterStateTaskExecutor ;
27
27
import org .elasticsearch .cluster .ClusterStateTaskListener ;
28
+ import org .elasticsearch .cluster .NotMasterException ;
28
29
import org .elasticsearch .cluster .metadata .IndexMetaData ;
29
30
import org .elasticsearch .cluster .node .DiscoveryNode ;
30
31
import org .elasticsearch .cluster .routing .RoutingService ;
53
54
import java .io .IOException ;
54
55
import java .util .ArrayList ;
55
56
import java .util .List ;
57
+ import java .util .Locale ;
56
58
57
59
import static org .elasticsearch .cluster .routing .ShardRouting .readShardRoutingEntry ;
58
60
@@ -113,7 +115,7 @@ public void handleResponse(TransportResponse.Empty response) {
113
115
114
116
@ Override
115
117
public void handleException (TransportException exp ) {
116
- logger .warn ("failed to send failed shard to {} " , exp , masterNode );
118
+ logger .warn ("unexpected failure while sending request to [{}] to fail shard [{}] " , exp , masterNode , shardRoutingEntry );
117
119
listener .onShardFailedFailure (masterNode , exp );
118
120
}
119
121
});
@@ -122,22 +124,62 @@ public void handleException(TransportException exp) {
122
124
private class ShardFailedTransportHandler implements TransportRequestHandler <ShardRoutingEntry > {
123
125
@ Override
124
126
public void messageReceived (ShardRoutingEntry request , TransportChannel channel ) throws Exception {
125
- handleShardFailureOnMaster (request );
126
- channel .sendResponse (TransportResponse .Empty .INSTANCE );
127
+ handleShardFailureOnMaster (request , new ClusterStateTaskListener () {
128
+ @ Override
129
+ public void onFailure (String source , Throwable t ) {
130
+ logger .error ("unexpected failure while failing shard [{}]" , t , request .shardRouting );
131
+ try {
132
+ channel .sendResponse (t );
133
+ } catch (Throwable channelThrowable ) {
134
+ logger .warn ("failed to send failure [{}] while failing shard [{}]" , channelThrowable , t , request .shardRouting );
135
+ }
136
+ }
137
+
138
+ @ Override
139
+ public void onNoLongerMaster (String source ) {
140
+ logger .error ("no longer master while failing shard [{}]" , request .shardRouting );
141
+ try {
142
+ channel .sendResponse (new NotMasterException (source ));
143
+ } catch (Throwable channelThrowable ) {
144
+ logger .warn ("failed to send no longer master while failing shard [{}]" , channelThrowable , request .shardRouting );
145
+ }
146
+ }
147
+
148
+ @ Override
149
+ public void clusterStateProcessed (String source , ClusterState oldState , ClusterState newState ) {
150
+ try {
151
+ int numberOfUnassignedShards = newState .getRoutingNodes ().unassigned ().size ();
152
+ if (oldState != newState && numberOfUnassignedShards > 0 ) {
153
+ String reason = String .format (Locale .ROOT , "[%d] unassigned shards after failing shard [%s]" , numberOfUnassignedShards , request .shardRouting );
154
+ if (logger .isTraceEnabled ()) {
155
+ logger .trace (reason + ", scheduling a reroute" );
156
+ }
157
+ routingService .reroute (reason );
158
+ }
159
+ } finally {
160
+ try {
161
+ channel .sendResponse (TransportResponse .Empty .INSTANCE );
162
+ } catch (Throwable channelThrowable ) {
163
+ logger .warn ("failed to send response while failing shard [{}]" , channelThrowable , request .shardRouting );
164
+ }
165
+ }
166
+ }
167
+ }
168
+ );
127
169
}
128
170
}
129
171
130
- class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor <ShardRoutingEntry >, ClusterStateTaskListener {
172
+ class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor <ShardRoutingEntry > {
131
173
@ Override
132
174
public BatchResult <ShardRoutingEntry > execute (ClusterState currentState , List <ShardRoutingEntry > tasks ) throws Exception {
133
175
BatchResult .Builder <ShardRoutingEntry > batchResultBuilder = BatchResult .builder ();
134
- List <FailedRerouteAllocation .FailedShard > shardRoutingsToBeApplied = new ArrayList <>(tasks .size ());
176
+ List <FailedRerouteAllocation .FailedShard > failedShards = new ArrayList <>(tasks .size ());
135
177
for (ShardRoutingEntry task : tasks ) {
136
- shardRoutingsToBeApplied .add (new FailedRerouteAllocation .FailedShard (task .shardRouting , task .message , task .failure ));
178
+ failedShards .add (new FailedRerouteAllocation .FailedShard (task .shardRouting , task .message , task .failure ));
137
179
}
138
180
ClusterState maybeUpdatedState = currentState ;
139
181
try {
140
- RoutingAllocation .Result result = allocationService .applyFailedShards (currentState , shardRoutingsToBeApplied );
182
+ RoutingAllocation .Result result = allocationService .applyFailedShards (currentState , failedShards );
141
183
if (result .changed ()) {
142
184
maybeUpdatedState = ClusterState .builder (currentState ).routingResult (result ).build ();
143
185
}
@@ -147,31 +189,18 @@ public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<Sh
147
189
}
148
190
return batchResultBuilder .build (maybeUpdatedState );
149
191
}
150
-
151
- @ Override
152
- public void clusterStateProcessed (String source , ClusterState oldState , ClusterState newState ) {
153
- if (oldState != newState && newState .getRoutingNodes ().unassigned ().size () > 0 ) {
154
- logger .trace ("unassigned shards after shard failures. scheduling a reroute." );
155
- routingService .reroute ("unassigned shards after shard failures, scheduling a reroute" );
156
- }
157
- }
158
-
159
- @ Override
160
- public void onFailure (String source , Throwable t ) {
161
- logger .error ("unexpected failure during [{}]" , t , source );
162
- }
163
192
}
164
193
165
194
private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler ();
166
195
167
- private void handleShardFailureOnMaster (final ShardRoutingEntry shardRoutingEntry ) {
196
+ private void handleShardFailureOnMaster (final ShardRoutingEntry shardRoutingEntry , ClusterStateTaskListener listener ) {
168
197
logger .warn ("{} received shard failed for {}" , shardRoutingEntry .failure , shardRoutingEntry .shardRouting .shardId (), shardRoutingEntry );
169
198
clusterService .submitStateUpdateTask (
170
199
"shard-failed (" + shardRoutingEntry .shardRouting + "), message [" + shardRoutingEntry .message + "]" ,
171
200
shardRoutingEntry ,
172
201
ClusterStateTaskConfig .build (Priority .HIGH ),
173
202
shardFailedClusterStateHandler ,
174
- shardFailedClusterStateHandler );
203
+ listener );
175
204
}
176
205
177
206
public void shardStarted (final ShardRouting shardRouting , String indexUUID , final String reason ) {
0 commit comments