Skip to content

Commit 3f64076

Browse files
committed
bring back the primary shard chasing to the PrimaryPhase
1 parent 2abe7c4 commit 3f64076

File tree

2 files changed

+86
-46
lines changed

2 files changed

+86
-46
lines changed

core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

+79-37
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,8 @@ public void onFailure(Throwable t) {
293293
@Override
294294
protected void doRun() throws Exception {
295295
final ShardRouting primary = request.internalShardRouting;
296+
// Although this gets executed locally, this more of an assertion, but if change the primary action
297+
// to be performed remotely this check is important to check before performing the action:
296298
if (clusterService.localNode().id().equals(primary.currentNodeId()) == false) {
297299
throw new NoShardAvailableActionException(primary.shardId(), "shard [{}] not assigned to this node [{}], but node [{}]", primary.shardId(), clusterService.localNode().id(), primary.currentNodeId());
298300
}
@@ -563,7 +565,7 @@ protected void doRun() {
563565
retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node.");
564566
return;
565567
}
566-
moveToPrimaryAction(primary);
568+
routeRequestOrPerformPrimaryActionLocally(primary);
567569
}
568570

569571
/**
@@ -618,52 +620,92 @@ protected ShardRouting resolvePrimary(ShardIterator shardIt) {
618620
/**
619621
* send the request to the node holding the primary or execute if local
620622
*/
621-
protected void moveToPrimaryAction(final ShardRouting primary) {
623+
protected void routeRequestOrPerformPrimaryActionLocally(final ShardRouting primary) {
622624
DiscoveryNode node = observer.observedState().nodes().get(primary.currentNodeId());
623-
Request request = internalRequest.request();
624-
request.internalShardRouting = primary;
625-
transportService.sendRequest(node, transportPrimaryAction, request, transportOptions, new BaseTransportResponseHandler<Response>() {
625+
if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
626+
Request request = internalRequest.request();
627+
request.internalShardRouting = primary;
628+
// this call is always local, but in the future we can send to remote nodes as well
629+
transportService.sendRequest(node, transportPrimaryAction, request, transportOptions, new BaseTransportResponseHandler<Response>() {
626630

627-
@Override
628-
public Response newInstance() {
629-
return newResponseInstance();
630-
}
631+
@Override
632+
public Response newInstance() {
633+
return newResponseInstance();
634+
}
631635

632-
@Override
633-
public String executor() {
634-
return ThreadPool.Names.SAME;
635-
}
636+
@Override
637+
public String executor() {
638+
return ThreadPool.Names.SAME;
639+
}
636640

637-
@Override
638-
public void handleResponse(Response response) {
639-
finishOnRemoteSuccess(response);
640-
}
641+
@Override
642+
public void handleResponse(Response response) {
643+
finishOnRemoteSuccess(response);
644+
}
641645

642-
@Override
643-
public void handleException(TransportException exp) {
644-
try {
645-
Throwable cause = exp.getCause();
646-
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
647-
if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException ||
648-
cause instanceof UnavailableShardsException || retryPrimaryException(cause)) {
649-
internalRequest.request().setCanHaveDuplicates();
650-
// we already marked it as started when we executed it (removed the listener) so pass false
651-
// to re-add to the cluster listener
652-
logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
653-
if (cause instanceof UnavailableShardsException) {
654-
UnavailableShardsException use = (UnavailableShardsException) cause;
655-
retryBecauseUnavailable(use.getShardId(), use.getMessage());
646+
@Override
647+
public void handleException(TransportException exp) {
648+
try {
649+
Throwable cause = exp.getCause();
650+
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
651+
if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException ||
652+
cause instanceof UnavailableShardsException || retryPrimaryException(cause)) {
653+
internalRequest.request().setCanHaveDuplicates();
654+
// we already marked it as started when we executed it (removed the listener) so pass false
655+
// to re-add to the cluster listener
656+
logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
657+
if (cause instanceof UnavailableShardsException) {
658+
UnavailableShardsException use = (UnavailableShardsException) cause;
659+
retryBecauseUnavailable(use.getShardId(), use.getMessage());
660+
} else {
661+
retry(exp);
662+
}
656663
} else {
664+
finishAsFailed(exp);
665+
}
666+
} catch (Throwable t) {
667+
finishWithUnexpectedFailure(t);
668+
}
669+
}
670+
});
671+
} else {
672+
transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler<Response>() {
673+
674+
@Override
675+
public Response newInstance() {
676+
return newResponseInstance();
677+
}
678+
679+
@Override
680+
public String executor() {
681+
return ThreadPool.Names.SAME;
682+
}
683+
684+
@Override
685+
public void handleResponse(Response response) {
686+
finishOnRemoteSuccess(response);
687+
}
688+
689+
@Override
690+
public void handleException(TransportException exp) {
691+
try {
692+
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
693+
if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
694+
retryPrimaryException(exp)) {
695+
internalRequest.request().setCanHaveDuplicates();
696+
// we already marked it as started when we executed it (removed the listener) so pass false
697+
// to re-add to the cluster listener
698+
logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
657699
retry(exp);
700+
} else {
701+
finishAsFailed(exp);
658702
}
659-
} else {
660-
finishAsFailed(exp);
703+
} catch (Throwable t) {
704+
finishWithUnexpectedFailure(t);
661705
}
662-
} catch (Throwable t) {
663-
finishWithUnexpectedFailure(t);
664706
}
665-
}
666-
});
707+
});
708+
}
667709
}
668710

669711
void retry(Throwable failure) {

core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java

+7-9
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666

6767
import java.io.IOException;
6868
import java.util.HashSet;
69+
import java.util.List;
6970
import java.util.Set;
7071
import java.util.concurrent.CountDownLatch;
7172
import java.util.concurrent.ExecutionException;
@@ -342,24 +343,21 @@ public void testRoutingToPrimary() {
342343

343344
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
344345
assertTrue(primaryPhase.checkBlocks());
346+
primaryPhase.routeRequestOrPerformPrimaryActionLocally(shardRoutingTable.primaryShard());
345347
if (primaryNodeId.equals(clusterService.localNode().id())) {
346348
logger.info("--> primary is assigned locally, testing for execution");
347-
primaryPhase.moveToPrimaryAction(shardRoutingTable.primaryShard());
348349
assertTrue("request failed to be processed on a local primary", request.processedOnPrimary.get());
349350
if (transport.capturedRequests().length > 0) {
350351
assertIndexShardCounter(2);
351352
} else {
352353
assertIndexShardCounter(1);
353354
}
354355
} else {
355-
// The coordinating node says primary shard is on the local node, but the local node doesn' have it,
356-
// We need to retry, something has changed in time between the coordination node received the request
357-
// and the node holding the primary shard processing the write request.
358-
// So we fail and retry (wait on a new cluster update or the timeout to expire) again from the coordinating node.
359-
logger.info("--> primary is assigned to [{}], checking request is going to be retried at some point", primaryNodeId);
360-
assertThat(clusterService.getListeners().size(), equalTo(0));
361-
primaryPhase.moveToPrimaryAction(shardRoutingTable.primaryShard());
362-
assertThat(clusterService.getListeners().size(), equalTo(1));
356+
logger.info("--> primary is assigned to [{}], checking request forwarded", primaryNodeId);
357+
final List<CapturingTransport.CapturedRequest> capturedRequests = transport.capturedRequestsByTargetNode().get(primaryNodeId);
358+
assertThat(capturedRequests, notNullValue());
359+
assertThat(capturedRequests.size(), equalTo(1));
360+
assertThat(capturedRequests.get(0).action, equalTo("testAction"));
363361
assertIndexShardUninitialized();
364362
}
365363
}

0 commit comments

Comments
 (0)