Skip to content

Commit 41706c3

Browse files
committed
Don’t ack if unable to remove failing replica (#39584)
Today when a replicated write operation fails to execute on a replica, the primary will reach out to the master to fail that replica (and mark it stale). We then won't ack that request until the master removes the failing replica; otherwise, we will lose the acked operation if the failed replica is still in the in-sync set. However, if a node with the primary is shutting down, we might ack such request even though we are unable to send a shard-failure request to the master. This happens because we ignore NodeClosedException which is triggered when the ClusterService is being closed. Closes #39467
1 parent be700f8 commit 41706c3

File tree

10 files changed

+135
-130
lines changed

10 files changed

+135
-130
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646

4747
import java.io.IOException;
4848
import java.util.Objects;
49-
import java.util.function.Consumer;
5049

5150
public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction<
5251
TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> {
@@ -130,10 +129,8 @@ class VerifyShardBeforeCloseActionReplicasProxy extends ReplicasProxy {
130129
}
131130

132131
@Override
133-
public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final Runnable onSuccess,
134-
final Consumer<Exception> onPrimaryDemoted, final Consumer<Exception> onIgnoredFailure) {
135-
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
136-
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
132+
public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final ActionListener<Void> listener) {
133+
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener);
137134
}
138135
}
139136

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import org.elasticsearch.transport.TransportService;
4949

5050
import java.io.IOException;
51-
import java.util.function.Consumer;
5251
import java.util.function.Supplier;
5352

5453
public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
@@ -210,10 +209,9 @@ class ResyncActionReplicasProxy extends ReplicasProxy {
210209
}
211210

212211
@Override
213-
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess,
214-
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
215-
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception,
216-
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
212+
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
213+
shardStateAction.remoteShardFailed(
214+
replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception, listener);
217215
}
218216
}
219217
}

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

+36-32
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,24 @@
2121
import org.apache.logging.log4j.Logger;
2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.apache.lucene.store.AlreadyClosedException;
24+
import org.elasticsearch.Assertions;
2425
import org.elasticsearch.ElasticsearchException;
2526
import org.elasticsearch.ExceptionsHelper;
2627
import org.elasticsearch.action.ActionListener;
2728
import org.elasticsearch.action.UnavailableShardsException;
2829
import org.elasticsearch.action.support.ActiveShardCount;
2930
import org.elasticsearch.action.support.TransportActions;
31+
import org.elasticsearch.cluster.action.shard.ShardStateAction;
3032
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
3133
import org.elasticsearch.cluster.routing.ShardRouting;
3234
import org.elasticsearch.common.Nullable;
3335
import org.elasticsearch.common.io.stream.StreamInput;
3436
import org.elasticsearch.index.seqno.SequenceNumbers;
3537
import org.elasticsearch.index.shard.ReplicationGroup;
3638
import org.elasticsearch.index.shard.ShardId;
39+
import org.elasticsearch.node.NodeClosedException;
3740
import org.elasticsearch.rest.RestStatus;
41+
import org.elasticsearch.transport.TransportException;
3842

3943
import java.io.IOException;
4044
import java.util.ArrayList;
@@ -43,7 +47,6 @@
4347
import java.util.Locale;
4448
import java.util.concurrent.atomic.AtomicBoolean;
4549
import java.util.concurrent.atomic.AtomicInteger;
46-
import java.util.function.Consumer;
4750

4851
public class ReplicationOperation<
4952
Request extends ReplicationRequest<Request>,
@@ -133,10 +136,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica
133136
for (String allocationId : replicationGroup.getUnavailableInSyncShards()) {
134137
pendingActions.incrementAndGet();
135138
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId,
136-
ReplicationOperation.this::decPendingAndFinishIfNeeded,
137-
ReplicationOperation.this::onPrimaryDemoted,
138-
throwable -> decPendingAndFinishIfNeeded()
139-
);
139+
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
140140
}
141141
}
142142

@@ -192,20 +192,32 @@ public void onFailure(Exception replicaException) {
192192
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
193193
}
194194
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
195-
replicasProxy.failShardIfNeeded(shard, message,
196-
replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
197-
ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
195+
replicasProxy.failShardIfNeeded(shard, message, replicaException,
196+
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
198197
}
199198
});
200199
}
201200

202-
private void onPrimaryDemoted(Exception demotionFailure) {
203-
String primaryFail = String.format(Locale.ROOT,
204-
"primary shard [%s] was demoted while failing replica shard",
205-
primary.routingEntry());
206-
// we are no longer the primary, fail ourselves and start over
207-
primary.failShard(primaryFail, demotionFailure);
208-
finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), primaryFail, demotionFailure));
201+
private void onNoLongerPrimary(Exception failure) {
202+
final boolean nodeIsClosing = failure instanceof NodeClosedException ||
203+
(failure instanceof TransportException && "TransportService is closed stopped can't send request".equals(failure.getMessage()));
204+
final String message;
205+
if (nodeIsClosing) {
206+
message = String.format(Locale.ROOT,
207+
"node with primary [%s] is shutting down while failing replica shard", primary.routingEntry());
208+
// We prefer not to fail the primary to avoid unnecessary warning log
209+
// when the node with the primary shard is gracefully shutting down.
210+
} else {
211+
if (Assertions.ENABLED) {
212+
if (failure instanceof ShardStateAction.NoLongerPrimaryShardException == false) {
213+
throw new AssertionError("unexpected failure", failure);
214+
}
215+
}
216+
// we are no longer the primary, fail ourselves and start over
217+
message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard", primary.routingEntry());
218+
primary.failShard(message, failure);
219+
}
220+
finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), message, failure));
209221
}
210222

211223
/**
@@ -365,31 +377,23 @@ void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpo
365377
* of active shards. Whether a failure is needed is left up to the
366378
* implementation.
367379
*
368-
* @param replica shard to fail
369-
* @param message a (short) description of the reason
370-
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
371-
* @param onSuccess a callback to call when the shard has been successfully removed from the active set.
372-
* @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted
373-
* by the master.
374-
* @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the
380+
* @param replica shard to fail
381+
* @param message a (short) description of the reason
382+
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
383+
* @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
375384
*/
376-
void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess,
377-
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
385+
void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener);
378386

379387
/**
380388
* Marks shard copy as stale if needed, removing its allocation id from
381389
* the set of in-sync allocation ids. Whether marking as stale is needed
382390
* is left up to the implementation.
383391
*
384-
* @param shardId shard id
385-
* @param allocationId allocation id to remove from the set of in-sync allocation ids
386-
* @param onSuccess a callback to call when the allocation id has been successfully removed from the in-sync set.
387-
* @param onPrimaryDemoted a callback to call when the request failed because the current primary was already demoted
388-
* by the master.
389-
* @param onIgnoredFailure a callback to call when the request failed, but the failure can be safely ignored.
392+
* @param shardId shard id
393+
* @param allocationId allocation id to remove from the set of in-sync allocation ids
394+
* @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
390395
*/
391-
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
392-
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
396+
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener);
393397
}
394398

395399
/**

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

+4-31
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@
8484
import java.util.Map;
8585
import java.util.Objects;
8686
import java.util.concurrent.atomic.AtomicBoolean;
87-
import java.util.function.Consumer;
8887
import java.util.function.Supplier;
8988

9089
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
@@ -1177,47 +1176,21 @@ public void performOn(
11771176
}
11781177

11791178
@Override
1180-
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
1181-
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
1179+
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
11821180
// This does not need to fail the shard. The idea is that this
11831181
// is a non-write operation (something like a refresh or a global
11841182
// checkpoint sync) and therefore the replica should still be
11851183
// "alive" if it were to fail.
1186-
onSuccess.run();
1184+
listener.onResponse(null);
11871185
}
11881186

11891187
@Override
1190-
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
1191-
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
1188+
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener) {
11921189
// This does not need to make the shard stale. The idea is that this
11931190
// is a non-write operation (something like a refresh or a global
11941191
// checkpoint sync) and therefore the replica should still be
11951192
// "alive" if it were to be marked as stale.
1196-
onSuccess.run();
1197-
}
1198-
1199-
protected final ActionListener<Void> createShardActionListener(final Runnable onSuccess,
1200-
final Consumer<Exception> onPrimaryDemoted,
1201-
final Consumer<Exception> onIgnoredFailure) {
1202-
return new ActionListener<Void>() {
1203-
@Override
1204-
public void onResponse(Void aVoid) {
1205-
onSuccess.run();
1206-
}
1207-
1208-
@Override
1209-
public void onFailure(Exception shardFailedError) {
1210-
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
1211-
onPrimaryDemoted.accept(shardFailedError);
1212-
} else {
1213-
// these can occur if the node is shutting down and are okay
1214-
// any other exception here is not expected and merits investigation
1215-
assert shardFailedError instanceof TransportException ||
1216-
shardFailedError instanceof NodeClosedException : shardFailedError;
1217-
onIgnoredFailure.accept(shardFailedError);
1218-
}
1219-
}
1220-
};
1193+
listener.onResponse(null);
12211194
}
12221195
}
12231196

server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

+5-9
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import java.util.concurrent.atomic.AtomicBoolean;
4848
import java.util.concurrent.atomic.AtomicInteger;
4949
import java.util.concurrent.atomic.AtomicReference;
50-
import java.util.function.Consumer;
5150
import java.util.function.Supplier;
5251

5352
/**
@@ -376,20 +375,17 @@ class WriteActionReplicasProxy extends ReplicasProxy {
376375
}
377376

378377
@Override
379-
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
380-
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
378+
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
381379
if (TransportActions.isShardNotAvailableException(exception) == false) {
382380
logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
383381
}
384-
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception,
385-
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
382+
shardStateAction.remoteShardFailed(
383+
replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception, listener);
386384
}
387385

388386
@Override
389-
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
390-
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
391-
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
392-
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
387+
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener) {
388+
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener);
393389
}
394390
}
395391
}

0 commit comments

Comments
 (0)