21
21
import org .apache .logging .log4j .Logger ;
22
22
import org .apache .logging .log4j .message .ParameterizedMessage ;
23
23
import org .apache .lucene .store .AlreadyClosedException ;
24
+ import org .elasticsearch .Assertions ;
24
25
import org .elasticsearch .ElasticsearchException ;
25
26
import org .elasticsearch .ExceptionsHelper ;
26
27
import org .elasticsearch .action .ActionListener ;
27
28
import org .elasticsearch .action .UnavailableShardsException ;
28
29
import org .elasticsearch .action .support .ActiveShardCount ;
29
30
import org .elasticsearch .action .support .TransportActions ;
31
+ import org .elasticsearch .cluster .action .shard .ShardStateAction ;
30
32
import org .elasticsearch .cluster .routing .IndexShardRoutingTable ;
31
33
import org .elasticsearch .cluster .routing .ShardRouting ;
32
34
import org .elasticsearch .common .Nullable ;
33
35
import org .elasticsearch .common .io .stream .StreamInput ;
34
36
import org .elasticsearch .index .seqno .SequenceNumbers ;
35
37
import org .elasticsearch .index .shard .ReplicationGroup ;
36
38
import org .elasticsearch .index .shard .ShardId ;
39
+ import org .elasticsearch .node .NodeClosedException ;
37
40
import org .elasticsearch .rest .RestStatus ;
41
+ import org .elasticsearch .transport .TransportException ;
38
42
39
43
import java .io .IOException ;
40
44
import java .util .ArrayList ;
43
47
import java .util .Locale ;
44
48
import java .util .concurrent .atomic .AtomicBoolean ;
45
49
import java .util .concurrent .atomic .AtomicInteger ;
46
- import java .util .function .Consumer ;
47
50
48
51
public class ReplicationOperation <
49
52
Request extends ReplicationRequest <Request >,
@@ -133,10 +136,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica
133
136
for (String allocationId : replicationGroup .getUnavailableInSyncShards ()) {
134
137
pendingActions .incrementAndGet ();
135
138
replicasProxy .markShardCopyAsStaleIfNeeded (replicaRequest .shardId (), allocationId ,
136
- ReplicationOperation .this ::decPendingAndFinishIfNeeded ,
137
- ReplicationOperation .this ::onPrimaryDemoted ,
138
- throwable -> decPendingAndFinishIfNeeded ()
139
- );
139
+ ActionListener .wrap (r -> decPendingAndFinishIfNeeded (), ReplicationOperation .this ::onNoLongerPrimary ));
140
140
}
141
141
}
142
142
@@ -192,9 +192,8 @@ public void onFailure(Exception replicaException) {
192
192
shard .shardId (), shard .currentNodeId (), replicaException , restStatus , false ));
193
193
}
194
194
String message = String .format (Locale .ROOT , "failed to perform %s on replica %s" , opType , shard );
195
- replicasProxy .failShardIfNeeded (shard , message ,
196
- replicaException , ReplicationOperation .this ::decPendingAndFinishIfNeeded ,
197
- ReplicationOperation .this ::onPrimaryDemoted , throwable -> decPendingAndFinishIfNeeded ());
195
+ replicasProxy .failShardIfNeeded (shard , message , replicaException ,
196
+ ActionListener .wrap (r -> decPendingAndFinishIfNeeded (), ReplicationOperation .this ::onNoLongerPrimary ));
198
197
}
199
198
200
199
@ Override
@@ -204,13 +203,26 @@ public String toString() {
204
203
});
205
204
}
206
205
207
- private void onPrimaryDemoted (Exception demotionFailure ) {
208
- String primaryFail = String .format (Locale .ROOT ,
209
- "primary shard [%s] was demoted while failing replica shard" ,
210
- primary .routingEntry ());
211
- // we are no longer the primary, fail ourselves and start over
212
- primary .failShard (primaryFail , demotionFailure );
213
- finishAsFailed (new RetryOnPrimaryException (primary .routingEntry ().shardId (), primaryFail , demotionFailure ));
206
+ private void onNoLongerPrimary (Exception failure ) {
207
+ final boolean nodeIsClosing = failure instanceof NodeClosedException ||
208
+ (failure instanceof TransportException && "TransportService is closed stopped can't send request" .equals (failure .getMessage ()));
209
+ final String message ;
210
+ if (nodeIsClosing ) {
211
+ message = String .format (Locale .ROOT ,
212
+ "node with primary [%s] is shutting down while failing replica shard" , primary .routingEntry ());
213
+ // We prefer not to fail the primary to avoid unnecessary warning log
214
+ // when the node with the primary shard is gracefully shutting down.
215
+ } else {
216
+ if (Assertions .ENABLED ) {
217
+ if (failure instanceof ShardStateAction .NoLongerPrimaryShardException == false ) {
218
+ throw new AssertionError ("unexpected failure" , failure );
219
+ }
220
+ }
221
+ // we are no longer the primary, fail ourselves and start over
222
+ message = String .format (Locale .ROOT , "primary shard [%s] was demoted while failing replica shard" , primary .routingEntry ());
223
+ primary .failShard (message , failure );
224
+ }
225
+ finishAsFailed (new RetryOnPrimaryException (primary .routingEntry ().shardId (), message , failure ));
214
226
}
215
227
216
228
/**
@@ -370,31 +382,23 @@ void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpo
370
382
* of active shards. Whether a failure is needed is left up to the
371
383
* implementation.
372
384
*
373
- * @param replica shard to fail
374
- * @param message a (short) description of the reason
375
- * @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
376
- * @param onSuccess a callback to call when the shard has been successfully removed from the active set.
377
- * @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted
378
- * by the master.
379
- * @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the
385
+ * @param replica shard to fail
386
+ * @param message a (short) description of the reason
387
+ * @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
388
+ * @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
380
389
*/
381
- void failShardIfNeeded (ShardRouting replica , String message , Exception exception , Runnable onSuccess ,
382
- Consumer <Exception > onPrimaryDemoted , Consumer <Exception > onIgnoredFailure );
390
+ void failShardIfNeeded (ShardRouting replica , String message , Exception exception , ActionListener <Void > listener );
383
391
384
392
/**
385
393
* Marks shard copy as stale if needed, removing its allocation id from
386
394
* the set of in-sync allocation ids. Whether marking as stale is needed
387
395
* is left up to the implementation.
388
396
*
389
- * @param shardId shard id
390
- * @param allocationId allocation id to remove from the set of in-sync allocation ids
391
- * @param onSuccess a callback to call when the allocation id has been successfully removed from the in-sync set.
392
- * @param onPrimaryDemoted a callback to call when the request failed because the current primary was already demoted
393
- * by the master.
394
- * @param onIgnoredFailure a callback to call when the request failed, but the failure can be safely ignored.
397
+ * @param shardId shard id
398
+ * @param allocationId allocation id to remove from the set of in-sync allocation ids
399
+ * @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
395
400
*/
396
- void markShardCopyAsStaleIfNeeded (ShardId shardId , String allocationId , Runnable onSuccess ,
397
- Consumer <Exception > onPrimaryDemoted , Consumer <Exception > onIgnoredFailure );
401
+ void markShardCopyAsStaleIfNeeded (ShardId shardId , String allocationId , ActionListener <Void > listener );
398
402
}
399
403
400
404
/**
0 commit comments