Skip to content

Commit fe780aa

Browse files
authored
Propagate forceExecution when acquiring permit (#60634)
Currently the transport replication action does not propagate the force execution parameter when acquiring the indexing permit. The logic to acquire the index permit supports force execution, so this parameter should be propagate. Fixes #60359.
1 parent 9497f66 commit fe780aa

File tree

5 files changed

+21
-12
lines changed

5 files changed

+21
-12
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ public abstract class TransportReplicationAction<
119119
protected final IndicesService indicesService;
120120
protected final TransportRequestOptions transportOptions;
121121
protected final String executor;
122+
protected final boolean forceExecutionOnPrimary;
122123

123124
// package private for testing
124125
protected final String transportReplicaAction;
@@ -157,6 +158,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
157158

158159
this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings);
159160
this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings);
161+
this.forceExecutionOnPrimary = forceExecutionOnPrimary;
160162

161163
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);
162164

@@ -905,7 +907,7 @@ void retryBecauseUnavailable(ShardId shardId, String message) {
905907
protected void acquirePrimaryOperationPermit(final IndexShard primary,
906908
final Request request,
907909
final ActionListener<Releasable> onAcquired) {
908-
primary.acquirePrimaryOperationPermit(onAcquired, executor, request);
910+
primary.acquirePrimaryOperationPermit(onAcquired, executor, request, forceExecutionOnPrimary);
909911
}
910912

911913
/**

server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ public abstract class TransportWriteAction<
6060
Response extends ReplicationResponse & WriteResponse
6161
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {
6262

63-
private final boolean forceExecution;
6463
private final IndexingPressure indexingPressure;
6564
private final String executor;
6665

@@ -74,13 +73,12 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe
7473
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
7574
request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary);
7675
this.executor = executor;
77-
this.forceExecution = forceExecutionOnPrimary;
7876
this.indexingPressure = indexingPressure;
7977
}
8078

8179
@Override
8280
protected Releasable checkOperationLimits(Request request) {
83-
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecution);
81+
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecutionOnPrimary);
8482
}
8583

8684
@Override
@@ -97,7 +95,7 @@ protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal
9795
// If this primary request was received directly from the network, we must mark a new primary
9896
// operation. This happens if the write action skips the reroute step (ex: rsync) or during
9997
// primary delegation, after the primary relocation hand-off.
100-
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecution);
98+
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecutionOnPrimary);
10199
}
102100
}
103101

@@ -107,7 +105,7 @@ protected long primaryOperationSize(Request request) {
107105

108106
@Override
109107
protected Releasable checkReplicaLimits(ReplicaRequest request) {
110-
return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecution);
108+
return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecutionOnPrimary);
111109
}
112110

113111
protected long replicaOperationSize(ReplicaRequest request) {
@@ -163,7 +161,7 @@ protected void doRun() {
163161

164162
@Override
165163
public boolean isForceExecution() {
166-
return forceExecution;
164+
return forceExecutionOnPrimary;
167165
}
168166
});
169167
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -2723,10 +2723,16 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
27232723
* isn't used
27242724
*/
27252725
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay, Object debugInfo) {
2726+
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, debugInfo, false);
2727+
}
2728+
2729+
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay, Object debugInfo,
2730+
boolean forceExecution) {
27262731
verifyNotClosed();
27272732
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
27282733

2729-
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, false, debugInfo);
2734+
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, forceExecution,
2735+
debugInfo);
27302736
}
27312737

27322738
/**

server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception {
131131
acquiredPermits.incrementAndGet();
132132
callback.onResponse(acquiredPermits::decrementAndGet);
133133
return null;
134-
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject());
134+
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject(), eq(true));
135135
when(indexShard.getReplicationGroup()).thenReturn(
136136
new ReplicationGroup(shardRoutingTable,
137137
clusterService.state().metadata().index(index).inSyncAllocationIds(shardId.id()),

server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@
126126
import static org.mockito.Matchers.anyLong;
127127
import static org.mockito.Matchers.anyObject;
128128
import static org.mockito.Matchers.anyString;
129+
import static org.mockito.Matchers.eq;
129130
import static org.mockito.Mockito.doAnswer;
130131
import static org.mockito.Mockito.doThrow;
131132
import static org.mockito.Mockito.mock;
@@ -152,6 +153,7 @@ public static <R extends ReplicationRequest> R resolveRequest(TransportRequest r
152153

153154
private static ThreadPool threadPool;
154155

156+
private boolean forceExecute;
155157
private ClusterService clusterService;
156158
private TransportService transportService;
157159
private CapturingTransport transport;
@@ -172,6 +174,7 @@ public static void beforeClass() {
172174
@Before
173175
public void setUp() throws Exception {
174176
super.setUp();
177+
forceExecute = randomBoolean();
175178
transport = new CapturingTransport();
176179
clusterService = createClusterService(threadPool);
177180
transportService = transport.createTransportService(clusterService.getSettings(), threadPool,
@@ -839,7 +842,7 @@ public void testSeqNoIsSetOnPrimary() {
839842
//noinspection unchecked
840843
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(count::decrementAndGet);
841844
return null;
842-
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject());
845+
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject(), eq(forceExecute));
843846
when(shard.getActiveOperationsCount()).thenAnswer(i -> count.get());
844847

845848
final IndexService indexService = mock(IndexService.class);
@@ -1272,7 +1275,7 @@ private class TestAction extends TransportReplicationAction<Request, Request, Te
12721275
super(settings, actionName, transportService, clusterService, indicesService, threadPool,
12731276
shardStateAction,
12741277
new ActionFilters(new HashSet<>()),
1275-
Request::new, Request::new, ThreadPool.Names.SAME);
1278+
Request::new, Request::new, ThreadPool.Names.SAME, false, forceExecute);
12761279
}
12771280

12781281
@Override
@@ -1343,7 +1346,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService
13431346
callback.onFailure(new ShardNotInPrimaryModeException(shardId, IndexShardState.STARTED));
13441347
}
13451348
return null;
1346-
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject());
1349+
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject(), eq(forceExecute));
13471350
doAnswer(invocation -> {
13481351
long term = (Long)invocation.getArguments()[0];
13491352
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[3];

0 commit comments

Comments
 (0)