Skip to content

Commit f9f7261

Browse files
committed
Revert "Revert "[RCI] Check blocks while having index shard permit in TransportReplicationAction (#35332)""
This reverts commit d3d7c01
1 parent b6014d9 commit f9f7261

File tree

2 files changed

+234
-136
lines changed

2 files changed

+234
-136
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 75 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -235,9 +235,39 @@ protected TransportRequestOptions transportOptions(Settings settings) {
235235
return TransportRequestOptions.EMPTY;
236236
}
237237

238+
private String concreteIndex(final ClusterState state, final ReplicationRequest request) {
239+
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
240+
}
241+
242+
private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) {
243+
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
244+
if (globalBlockLevel != null) {
245+
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel);
246+
if (blockException != null) {
247+
return blockException;
248+
}
249+
}
250+
ClusterBlockLevel indexBlockLevel = indexBlockLevel();
251+
if (indexBlockLevel != null) {
252+
ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, indexName);
253+
if (blockException != null) {
254+
return blockException;
255+
}
256+
}
257+
return null;
258+
}
259+
238260
protected boolean retryPrimaryException(final Throwable e) {
239261
return e.getClass() == ReplicationOperation.RetryOnPrimaryException.class
240-
|| TransportActions.isShardNotAvailableException(e);
262+
|| TransportActions.isShardNotAvailableException(e)
263+
|| isRetryableClusterBlockException(e);
264+
}
265+
266+
boolean isRetryableClusterBlockException(final Throwable e) {
267+
if (e instanceof ClusterBlockException) {
268+
return ((ClusterBlockException) e).retryable();
269+
}
270+
return false;
241271
}
242272

243273
protected class OperationTransportHandler implements TransportRequestHandler<Request> {
@@ -310,6 +340,15 @@ protected void doRun() throws Exception {
310340
@Override
311341
public void onResponse(PrimaryShardReference primaryShardReference) {
312342
try {
343+
final ClusterState clusterState = clusterService.state();
344+
final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(primaryShardReference.routingEntry().index());
345+
346+
final ClusterBlockException blockException = blockExceptions(clusterState, indexMetaData.getIndex().getName());
347+
if (blockException != null) {
348+
logger.trace("cluster is blocked, action failed on primary", blockException);
349+
throw blockException;
350+
}
351+
313352
if (primaryShardReference.isRelocated()) {
314353
primaryShardReference.close(); // release shard operation lock as soon as possible
315354
setPhase(replicationTask, "primary_delegation");
@@ -323,7 +362,7 @@ public void onResponse(PrimaryShardReference primaryShardReference) {
323362
response.readFrom(in);
324363
return response;
325364
};
326-
DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
365+
DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
327366
transportService.sendRequest(relocatingNode, transportPrimaryAction,
328367
new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm),
329368
transportOptions,
@@ -696,35 +735,42 @@ public void onFailure(Exception e) {
696735
protected void doRun() {
697736
setPhase(task, "routing");
698737
final ClusterState state = observer.setAndGetObservedState();
699-
if (handleBlockExceptions(state)) {
700-
return;
701-
}
702-
703-
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
704-
final String concreteIndex = concreteIndex(state);
705-
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
706-
if (indexMetaData == null) {
707-
retry(new IndexNotFoundException(concreteIndex));
708-
return;
709-
}
710-
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
711-
throw new IndexClosedException(indexMetaData.getIndex());
712-
}
738+
final String concreteIndex = concreteIndex(state, request);
739+
final ClusterBlockException blockException = blockExceptions(state, concreteIndex);
740+
if (blockException != null) {
741+
if (blockException.retryable()) {
742+
logger.trace("cluster is blocked, scheduling a retry", blockException);
743+
retry(blockException);
744+
} else {
745+
finishAsFailed(blockException);
746+
}
747+
} else {
748+
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
749+
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
750+
if (indexMetaData == null) {
751+
retry(new IndexNotFoundException(concreteIndex));
752+
return;
753+
}
754+
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
755+
throw new IndexClosedException(indexMetaData.getIndex());
756+
}
713757

714-
// resolve all derived request fields, so we can route and apply it
715-
resolveRequest(indexMetaData, request);
716-
assert request.shardId() != null : "request shardId must be set in resolveRequest";
717-
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
758+
// resolve all derived request fields, so we can route and apply it
759+
resolveRequest(indexMetaData, request);
760+
assert request.shardId() != null : "request shardId must be set in resolveRequest";
761+
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
762+
"request waitForActiveShards must be set in resolveRequest";
718763

719-
final ShardRouting primary = primary(state);
720-
if (retryIfUnavailable(state, primary)) {
721-
return;
722-
}
723-
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
724-
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
725-
performLocalAction(state, primary, node, indexMetaData);
726-
} else {
727-
performRemoteAction(state, primary, node);
764+
final ShardRouting primary = primary(state);
765+
if (retryIfUnavailable(state, primary)) {
766+
return;
767+
}
768+
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
769+
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
770+
performLocalAction(state, primary, node, indexMetaData);
771+
} else {
772+
performRemoteAction(state, primary, node);
773+
}
728774
}
729775
}
730776

@@ -776,44 +822,11 @@ private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) {
776822
return false;
777823
}
778824

779-
private String concreteIndex(ClusterState state) {
780-
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
781-
}
782-
783825
private ShardRouting primary(ClusterState state) {
784826
IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId());
785827
return indexShard.primaryShard();
786828
}
787829

788-
private boolean handleBlockExceptions(ClusterState state) {
789-
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
790-
if (globalBlockLevel != null) {
791-
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel);
792-
if (blockException != null) {
793-
handleBlockException(blockException);
794-
return true;
795-
}
796-
}
797-
ClusterBlockLevel indexBlockLevel = indexBlockLevel();
798-
if (indexBlockLevel != null) {
799-
ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, concreteIndex(state));
800-
if (blockException != null) {
801-
handleBlockException(blockException);
802-
return true;
803-
}
804-
}
805-
return false;
806-
}
807-
808-
private void handleBlockException(ClusterBlockException blockException) {
809-
if (blockException.retryable()) {
810-
logger.trace("cluster is blocked, scheduling a retry", blockException);
811-
retry(blockException);
812-
} else {
813-
finishAsFailed(blockException);
814-
}
815-
}
816-
817830
private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction,
818831
final TransportRequest requestToPerform) {
819832
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {

0 commit comments

Comments
 (0)