Skip to content

Commit 48b4f08

Browse files
committed
Replication operation that try to perform the primary phase on a replica should be retried
In extreme cases a local primary shard can be replaced with a replica while a replication request is in flight and the primary action is applied to the shard (via `acquirePrimaryOperationLock()). #17044 changed the exception used in that method to something that isn't recognized as `TransportActions.isShardNotAvailableException`, causing the operation to fail immediately instead of retrying. This commit fixes this by check the primary flag before acquiring the lock. This is safe to do as an IndexShard will never be demoted once a primary. Closes #17358
1 parent 833fc84 commit 48b4f08

File tree

3 files changed

+87
-24
lines changed

3 files changed

+87
-24
lines changed

core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -549,8 +549,9 @@ public void handleResponse(Response response) {
549549
public void handleException(TransportException exp) {
550550
try {
551551
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
552-
if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
553-
(isPrimaryAction && retryPrimaryException(exp.unwrapCause()))) {
552+
final Throwable cause = exp.unwrapCause();
553+
if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException ||
554+
(isPrimaryAction && retryPrimaryException(cause))) {
554555
logger.trace("received an error from node [{}] for request [{}], scheduling a retry", exp, node.id(), request);
555556
retry(exp);
556557
} else {
@@ -799,6 +800,12 @@ void finishBecauseUnavailable(ShardId shardId, String message) {
799800
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
800801
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
801802
IndexShard indexShard = indexService.getShard(shardId.id());
803+
// we may end up here if the cluster state used to route the primary is so stale that the underlying
804+
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
805+
// the replica will take over and a replica will be assigned to the first node.
806+
if (indexShard.routingEntry().primary() == false) {
807+
throw new RetryOnPrimaryException(indexShard.shardId(), "actual shard is not a primary " + indexShard.routingEntry());
808+
}
802809
return IndexShardReferenceImpl.createOnPrimary(indexShard);
803810
}
804811

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

-1
Original file line numberDiff line numberDiff line change
@@ -974,7 +974,6 @@ private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardSta
974974

975975
private void verifyPrimary() {
976976
if (shardRouting.primary() == false) {
977-
// must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
978977
throw new IllegalStateException("shard is not a primary " + shardRouting);
979978
}
980979
}

core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

+78-21
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import org.elasticsearch.common.io.stream.StreamOutput;
5151
import org.elasticsearch.common.settings.Settings;
5252
import org.elasticsearch.index.IndexNotFoundException;
53+
import org.elasticsearch.index.engine.EngineClosedException;
54+
import org.elasticsearch.index.shard.IndexShardClosedException;
5355
import org.elasticsearch.index.shard.IndexShardNotStartedException;
5456
import org.elasticsearch.index.shard.IndexShardState;
5557
import org.elasticsearch.index.shard.ShardId;
@@ -158,15 +160,15 @@ public void testBlocks() throws ExecutionException, InterruptedException {
158160
ReplicationTask task = maybeTask();
159161

160162
ClusterBlocks.Builder block = ClusterBlocks.builder()
161-
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
163+
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
162164
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
163165
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
164166
reroutePhase.run();
165167
assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class);
166168
assertPhase(task, "failed");
167169

168170
block = ClusterBlocks.builder()
169-
.addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
171+
.addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
170172
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
171173
listener = new PlainActionFuture<>();
172174
reroutePhase = action.new ReroutePhase(task, new Request().timeout("5ms"), listener);
@@ -181,7 +183,7 @@ public void testBlocks() throws ExecutionException, InterruptedException {
181183
assertPhase(task, "waiting_for_retry");
182184

183185
block = ClusterBlocks.builder()
184-
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
186+
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
185187
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
186188
assertListenerThrows("primary phase should fail operation when moving from a retryable block to a non-retryable one", listener, ClusterBlockException.class);
187189
assertIndexShardUninitialized();
@@ -196,7 +198,7 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept
196198
final ShardId shardId = new ShardId(index, "_na_", 0);
197199
// no replicas in oder to skip the replication part
198200
setState(clusterService, state(index, true,
199-
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
201+
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
200202
ReplicationTask task = maybeTask();
201203

202204
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
@@ -221,7 +223,7 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept
221223
final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
222224
final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId();
223225
final List<CapturingTransport.CapturedRequest> capturedRequests =
224-
transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId);
226+
transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId);
225227
assertThat(capturedRequests, notNullValue());
226228
assertThat(capturedRequests.size(), equalTo(1));
227229
assertThat(capturedRequests.get(0).action, equalTo("testAction[p]"));
@@ -234,7 +236,7 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept
234236
* before the relocation target, there is a time span where relocation source believes active primary to be on
235237
* relocation target and relocation target believes active primary to be on relocation source. This results in replication
236238
* requests being sent back and forth.
237-
*
239+
* <p>
238240
* This test checks that replication request is not routed back from relocation target to relocation source in case of
239241
* stale index routing table on relocation target.
240242
*/
@@ -271,7 +273,7 @@ public void testNoRerouteOnStaleClusterState() throws InterruptedException, Exec
271273
IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
272274
final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId();
273275
final List<CapturingTransport.CapturedRequest> capturedRequests =
274-
transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId);
276+
transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId);
275277
assertThat(capturedRequests, notNullValue());
276278
assertThat(capturedRequests.size(), equalTo(1));
277279
assertThat(capturedRequests.get(0).action, equalTo("testAction[p]"));
@@ -282,7 +284,7 @@ public void testUnknownIndexOrShardOnReroute() throws InterruptedException {
282284
final String index = "test";
283285
// no replicas in oder to skip the replication part
284286
setState(clusterService, state(index, true,
285-
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
287+
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
286288
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
287289
Request request = new Request(new ShardId("unknown_index", "_na_", 0)).timeout("1ms");
288290
PlainActionFuture<Response> listener = new PlainActionFuture<>();
@@ -299,6 +301,61 @@ public void testUnknownIndexOrShardOnReroute() throws InterruptedException {
299301
assertListenerThrows("must throw shard not found exception", listener, ShardNotFoundException.class);
300302
}
301303

304+
public void testStalePrimaryShardOnReroute() throws InterruptedException {
305+
final String index = "test";
306+
final ShardId shardId = new ShardId(index, "_na_", 0);
307+
// no replicas in order to skip the replication part
308+
setState(clusterService, stateWithActivePrimary(index, true, randomInt(3)));
309+
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
310+
Request request = new Request(shardId);
311+
boolean timeout = randomBoolean();
312+
if (timeout) {
313+
request.timeout("0s");
314+
} else {
315+
request.timeout("1h");
316+
}
317+
PlainActionFuture<Response> listener = new PlainActionFuture<>();
318+
ReplicationTask task = maybeTask();
319+
320+
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
321+
reroutePhase.run();
322+
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
323+
assertThat(capturedRequests, arrayWithSize(1));
324+
assertThat(capturedRequests[0].action, equalTo("testAction[p]"));
325+
assertPhase(task, "waiting_on_primary");
326+
transport.handleRemoteError(capturedRequests[0].requestId, randomRetryPrimaryException(shardId));
327+
328+
329+
if (timeout) {
330+
// we always try at least one more time on timeout
331+
assertThat(listener.isDone(), equalTo(false));
332+
capturedRequests = transport.getCapturedRequestsAndClear();
333+
assertThat(capturedRequests, arrayWithSize(1));
334+
assertThat(capturedRequests[0].action, equalTo("testAction[p]"));
335+
assertPhase(task, "waiting_on_primary");
336+
transport.handleRemoteError(capturedRequests[0].requestId, randomRetryPrimaryException(shardId));
337+
assertListenerThrows("must throw index not found exception", listener, ElasticsearchException.class);
338+
assertPhase(task, "failed");
339+
} else {
340+
assertThat(listener.isDone(), equalTo(false));
341+
// generate a CS change
342+
setState(clusterService, clusterService.state());
343+
capturedRequests = transport.getCapturedRequestsAndClear();
344+
assertThat(capturedRequests, arrayWithSize(1));
345+
assertThat(capturedRequests[0].action, equalTo("testAction[p]"));
346+
}
347+
}
348+
349+
private ElasticsearchException randomRetryPrimaryException(ShardId shardId) {
350+
return randomFrom(
351+
new ShardNotFoundException(shardId),
352+
new IndexNotFoundException(shardId.getIndex()),
353+
new IndexShardClosedException(shardId),
354+
new EngineClosedException(shardId),
355+
new TransportReplicationAction.RetryOnPrimaryException(shardId, "hello")
356+
);
357+
}
358+
302359
public void testRoutePhaseExecutesRequest() {
303360
final String index = "test";
304361
final ShardId shardId = new ShardId(index, "_na_", 0);
@@ -449,7 +506,7 @@ protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Re
449506
PlainActionFuture<Response> listener = new PlainActionFuture<>();
450507
ReplicationTask task = maybeTask();
451508
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = actionWithRelocatingReplicasAfterPrimaryOp.new PrimaryPhase(
452-
task, request, createTransportChannel(listener));
509+
task, request, createTransportChannel(listener));
453510
primaryPhase.run();
454511
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
455512
ShardRouting relocatingReplicaShard = stateWithRelocatingReplica.getRoutingTable().shardRoutingTable(index, shardId.id()).replicaShards().get(0);
@@ -485,7 +542,7 @@ protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Re
485542
PlainActionFuture<Response> listener = new PlainActionFuture<>();
486543
ReplicationTask task = maybeTask();
487544
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = actionWithDeletedIndexAfterPrimaryOp.new PrimaryPhase(
488-
task, request, createTransportChannel(listener));
545+
task, request, createTransportChannel(listener));
489546
primaryPhase.run();
490547
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
491548
assertThat("replication phase should be skipped if index gets deleted after primary operation", transport.capturedRequestsByTargetNode().size(), equalTo(0));
@@ -529,8 +586,8 @@ public void testWriteConsistency() throws ExecutionException, InterruptedExcepti
529586

530587
setState(clusterService, state(index, true, ShardRoutingState.STARTED, replicaStates));
531588
logger.debug("using consistency level of [{}], assigned shards [{}], total shards [{}]. expecting op to [{}]. using state: \n{}",
532-
request.consistencyLevel(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas, passesWriteConsistency ? "succeed" : "retry",
533-
clusterService.state().prettyPrint());
589+
request.consistencyLevel(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas, passesWriteConsistency ? "succeed" : "retry",
590+
clusterService.state().prettyPrint());
534591

535592
final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
536593
PlainActionFuture<Response> listener = new PlainActionFuture<>();
@@ -646,7 +703,7 @@ protected void runReplicateTest(ClusterState state, IndexShardRoutingTable shard
646703

647704
TransportChannel channel = createTransportChannel(listener, error::set);
648705
TransportReplicationAction<Request, Request, Response>.ReplicationPhase replicationPhase =
649-
action.new ReplicationPhase(task, request, new Response(), request.shardId(), channel, reference);
706+
action.new ReplicationPhase(task, request, new Response(), request.shardId(), channel, reference);
650707

651708
assertThat(replicationPhase.totalShards(), equalTo(totalShards));
652709
assertThat(replicationPhase.pending(), equalTo(assignedReplicas));
@@ -656,7 +713,7 @@ protected void runReplicateTest(ClusterState state, IndexShardRoutingTable shard
656713

657714
HashMap<String, Request> nodesSentTo = new HashMap<>();
658715
boolean executeOnReplica =
659-
action.shouldExecuteReplication(clusterService.state().getMetaData().index(shardId.getIndex()).getSettings());
716+
action.shouldExecuteReplication(clusterService.state().getMetaData().index(shardId.getIndex()).getSettings());
660717
for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) {
661718
// no duplicate requests
662719
Request replicationRequest = (Request) capturedRequest.request;
@@ -819,7 +876,7 @@ public void testCounterIncrementedWhileReplicationOngoing() throws InterruptedEx
819876
final ShardId shardId = new ShardId(index, "_na_", 0);
820877
// one replica to make sure replication is attempted
821878
setState(clusterService, state(index, true,
822-
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
879+
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
823880
ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(shardId).primaryShard();
824881
indexShardRouting.set(primaryShard);
825882
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
@@ -856,7 +913,7 @@ public void testCounterIncrementedWhileReplicationOngoing() throws InterruptedEx
856913
public void testReplicasCounter() throws Exception {
857914
final ShardId shardId = new ShardId("test", "_na_", 0);
858915
setState(clusterService, state(shardId.getIndexName(), true,
859-
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
916+
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
860917
action = new ActionWithDelay(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool);
861918
final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
862919
final ReplicationTask task = maybeTask();
@@ -895,7 +952,7 @@ public void testCounterDecrementedIfShardOperationThrowsException() throws Inter
895952
final String index = "test";
896953
final ShardId shardId = new ShardId(index, "_na_", 0);
897954
setState(clusterService, state(index, true,
898-
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
955+
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
899956
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
900957
Request request = new Request(shardId).timeout("100ms");
901958
PlainActionFuture<Response> listener = new PlainActionFuture<>();
@@ -915,7 +972,7 @@ public void testReroutePhaseRetriedAfterDemotedPrimary() {
915972
final ShardId shardId = new ShardId(index, "_na_", 0);
916973
boolean localPrimary = true;
917974
setState(clusterService, state(index, localPrimary,
918-
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
975+
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
919976
Action action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
920977
@Override
921978
protected void resolveRequest(MetaData metaData, String concreteIndex, Request request) {
@@ -967,7 +1024,7 @@ protected void resolveRequest(MetaData metaData, String concreteIndex, Request r
9671024
// publish a new cluster state
9681025
boolean localPrimaryOnRetry = randomBoolean();
9691026
setState(clusterService, state(index, localPrimaryOnRetry,
970-
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
1027+
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
9711028
CapturingTransport.CapturedRequest[] primaryRetry = transport.getCapturedRequestsAndClear();
9721029

9731030
// the request should be retried
@@ -1083,8 +1140,8 @@ class Action extends TransportReplicationAction<Request, Request, Response> {
10831140
ClusterService clusterService,
10841141
ThreadPool threadPool) {
10851142
super(settings, actionName, transportService, clusterService, null, threadPool,
1086-
new ShardStateAction(settings, clusterService, transportService, null, null, threadPool),
1087-
new ActionFilters(new HashSet<ActionFilter>()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new, Request::new, ThreadPool.Names.SAME);
1143+
new ShardStateAction(settings, clusterService, transportService, null, null, threadPool),
1144+
new ActionFilters(new HashSet<ActionFilter>()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new, Request::new, ThreadPool.Names.SAME);
10881145
}
10891146

10901147
@Override

0 commit comments

Comments
 (0)