Skip to content

Commit 2f76c48

Browse files
committed
Propagate forceExecution when acquiring permit (#60634)
Currently the transport replication action does not propagate the force execution parameter when acquiring the indexing permit. The logic to acquire the index permit supports force execution, so this parameter should be propagate. Fixes #60359.
1 parent d88098c commit 2f76c48

File tree

5 files changed

+21
-12
lines changed

5 files changed

+21
-12
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ public abstract class TransportReplicationAction<
120120
protected final IndicesService indicesService;
121121
protected final TransportRequestOptions transportOptions;
122122
protected final String executor;
123+
protected final boolean forceExecutionOnPrimary;
123124

124125
// package private for testing
125126
protected final String transportReplicaAction;
@@ -158,6 +159,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
158159

159160
this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings);
160161
this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings);
162+
this.forceExecutionOnPrimary = forceExecutionOnPrimary;
161163

162164
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);
163165

@@ -906,7 +908,7 @@ void retryBecauseUnavailable(ShardId shardId, String message) {
906908
protected void acquirePrimaryOperationPermit(final IndexShard primary,
907909
final Request request,
908910
final ActionListener<Releasable> onAcquired) {
909-
primary.acquirePrimaryOperationPermit(onAcquired, executor, request);
911+
primary.acquirePrimaryOperationPermit(onAcquired, executor, request, forceExecutionOnPrimary);
910912
}
911913

912914
/**

server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ public abstract class TransportWriteAction<
6060
Response extends ReplicationResponse & WriteResponse
6161
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {
6262

63-
private final boolean forceExecution;
6463
private final IndexingPressure indexingPressure;
6564
private final String executor;
6665

@@ -74,13 +73,12 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe
7473
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
7574
request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary);
7675
this.executor = executor;
77-
this.forceExecution = forceExecutionOnPrimary;
7876
this.indexingPressure = indexingPressure;
7977
}
8078

8179
@Override
8280
protected Releasable checkOperationLimits(Request request) {
83-
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecution);
81+
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecutionOnPrimary);
8482
}
8583

8684
@Override
@@ -97,7 +95,7 @@ protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal
9795
// If this primary request was received directly from the network, we must mark a new primary
9896
// operation. This happens if the write action skips the reroute step (ex: rsync) or during
9997
// primary delegation, after the primary relocation hand-off.
100-
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecution);
98+
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecutionOnPrimary);
10199
}
102100
}
103101

@@ -107,7 +105,7 @@ protected long primaryOperationSize(Request request) {
107105

108106
@Override
109107
protected Releasable checkReplicaLimits(ReplicaRequest request) {
110-
return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecution);
108+
return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecutionOnPrimary);
111109
}
112110

113111
protected long replicaOperationSize(ReplicaRequest request) {
@@ -163,7 +161,7 @@ protected void doRun() {
163161

164162
@Override
165163
public boolean isForceExecution() {
166-
return forceExecution;
164+
return forceExecutionOnPrimary;
167165
}
168166
});
169167
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -2791,10 +2791,16 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
27912791
* isn't used
27922792
*/
27932793
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay, Object debugInfo) {
2794+
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, debugInfo, false);
2795+
}
2796+
2797+
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay, Object debugInfo,
2798+
boolean forceExecution) {
27942799
verifyNotClosed();
27952800
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
27962801

2797-
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, false, debugInfo);
2802+
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, forceExecution,
2803+
debugInfo);
27982804
}
27992805

28002806
/**

server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception {
131131
acquiredPermits.incrementAndGet();
132132
callback.onResponse(acquiredPermits::decrementAndGet);
133133
return null;
134-
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject());
134+
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject(), eq(true));
135135
when(indexShard.getReplicationGroup()).thenReturn(
136136
new ReplicationGroup(shardRoutingTable,
137137
clusterService.state().metadata().index(index).inSyncAllocationIds(shardId.id()),

server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@
126126
import static org.mockito.Matchers.anyLong;
127127
import static org.mockito.Matchers.anyObject;
128128
import static org.mockito.Matchers.anyString;
129+
import static org.mockito.Matchers.eq;
129130
import static org.mockito.Mockito.doAnswer;
130131
import static org.mockito.Mockito.doThrow;
131132
import static org.mockito.Mockito.mock;
@@ -152,6 +153,7 @@ public static <R extends ReplicationRequest> R resolveRequest(TransportRequest r
152153

153154
private static ThreadPool threadPool;
154155

156+
private boolean forceExecute;
155157
private ClusterService clusterService;
156158
private TransportService transportService;
157159
private CapturingTransport transport;
@@ -172,6 +174,7 @@ public static void beforeClass() {
172174
@Before
173175
public void setUp() throws Exception {
174176
super.setUp();
177+
forceExecute = randomBoolean();
175178
transport = new CapturingTransport();
176179
clusterService = createClusterService(threadPool);
177180
transportService = transport.createTransportService(clusterService.getSettings(), threadPool,
@@ -839,7 +842,7 @@ public void testSeqNoIsSetOnPrimary() {
839842
//noinspection unchecked
840843
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(count::decrementAndGet);
841844
return null;
842-
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject());
845+
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject(), eq(forceExecute));
843846
when(shard.getActiveOperationsCount()).thenAnswer(i -> count.get());
844847

845848
final IndexService indexService = mock(IndexService.class);
@@ -1272,7 +1275,7 @@ private class TestAction extends TransportReplicationAction<Request, Request, Te
12721275
super(settings, actionName, transportService, clusterService, indicesService, threadPool,
12731276
shardStateAction,
12741277
new ActionFilters(new HashSet<>()),
1275-
Request::new, Request::new, ThreadPool.Names.SAME);
1278+
Request::new, Request::new, ThreadPool.Names.SAME, false, forceExecute);
12761279
}
12771280

12781281
@Override
@@ -1343,7 +1346,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService
13431346
callback.onFailure(new ShardNotInPrimaryModeException(shardId, IndexShardState.STARTED));
13441347
}
13451348
return null;
1346-
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject());
1349+
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject(), eq(forceExecute));
13471350
doAnswer(invocation -> {
13481351
long term = (Long)invocation.getArguments()[0];
13491352
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[3];

0 commit comments

Comments
 (0)