Skip to content

Commit 1cf9436

Browse files
tlrxoriginal-brownbear
authored andcommitted
Expose all permits acquisition in IndexShard and TransportReplicationAction (#35540)
This pull request exposes two new methods in the IndexShard and TransportReplicationAction classes in order to allow transport replication actions to acquire all index shard operation permits for their execution. It first adds the acquireAllPrimaryOperationPermits() and the acquireAllReplicaOperationsPermits() methods to the IndexShard class which allow to acquire all operations permits on a shard while exposing a Releasable. It also refactors the TransportReplicationAction class to expose two protected methods (acquirePrimaryOperationPermit() and acquireReplicaOperationPermit()) that can be overridden when a transport replication action requires the acquisition of all permits on primary and/or replica shard during execution. Finally, it adds a TransportReplicationAllPermitsAcquisitionTests which illustrates how a transport replication action can grab all permits before adding a cluster block in the cluster state, making subsequent operations that requires a single permit to fail). Related to elastic #33888
1 parent d0b5006 commit 1cf9436

File tree

5 files changed

+874
-146
lines changed

5 files changed

+874
-146
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 47 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ public void messageReceived(ConcreteShardRequest<Request> request, TransportChan
313313
}
314314
}
315315

316-
class AsyncPrimaryAction extends AbstractRunnable implements ActionListener<PrimaryShardReference> {
316+
class AsyncPrimaryAction extends AbstractRunnable {
317317

318318
private final Request request;
319319
// targetAllocationID of the shard this request is meant for
@@ -334,11 +334,33 @@ class AsyncPrimaryAction extends AbstractRunnable implements ActionListener<Prim
334334

335335
@Override
336336
protected void doRun() throws Exception {
337-
acquirePrimaryShardReference(request.shardId(), targetAllocationID, primaryTerm, this, request);
337+
final ShardId shardId = request.shardId();
338+
final IndexShard indexShard = getIndexShard(shardId);
339+
final ShardRouting shardRouting = indexShard.routingEntry();
340+
// we may end up here if the cluster state used to route the primary is so stale that the underlying
341+
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
342+
// the replica will take over and a replica will be assigned to the first node.
343+
if (shardRouting.primary() == false) {
344+
throw new ReplicationOperation.RetryOnPrimaryException(shardId, "actual shard is not a primary " + shardRouting);
345+
}
346+
final String actualAllocationId = shardRouting.allocationId().getId();
347+
if (actualAllocationId.equals(targetAllocationID) == false) {
348+
throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]", targetAllocationID,
349+
actualAllocationId);
350+
}
351+
final long actualTerm = indexShard.getPendingPrimaryTerm();
352+
if (actualTerm != primaryTerm) {
353+
throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]", targetAllocationID,
354+
primaryTerm, actualTerm);
355+
}
356+
357+
acquirePrimaryOperationPermit(indexShard, request, ActionListener.wrap(
358+
releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
359+
this::onFailure
360+
));
338361
}
339362

340-
@Override
341-
public void onResponse(PrimaryShardReference primaryShardReference) {
363+
void runWithPrimaryShardReference(final PrimaryShardReference primaryShardReference) {
342364
try {
343365
final ClusterState clusterState = clusterService.state();
344366
final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(primaryShardReference.routingEntry().index());
@@ -660,10 +682,10 @@ protected void doRun() throws Exception {
660682
setPhase(task, "replica");
661683
final String actualAllocationId = this.replica.routingEntry().allocationId().getId();
662684
if (actualAllocationId.equals(targetAllocationID) == false) {
663-
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
685+
throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]", targetAllocationID,
664686
actualAllocationId);
665687
}
666-
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, this, executor, request);
688+
acquireReplicaOperationPermit(replica, request, this, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
667689
}
668690

669691
/**
@@ -697,7 +719,7 @@ public void onFailure(Exception e) {
697719
}
698720
}
699721

700-
protected IndexShard getIndexShard(ShardId shardId) {
722+
protected IndexShard getIndexShard(final ShardId shardId) {
701723
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
702724
return indexService.getShard(shardId.id());
703725
}
@@ -938,42 +960,26 @@ void retryBecauseUnavailable(ShardId shardId, String message) {
938960
}
939961

940962
/**
941-
* Tries to acquire reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
942-
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}).
963+
* Executes the logic for acquiring one or more operation permit on a primary shard. The default is to acquire a single permit but this
964+
* method can be overridden to acquire more.
943965
*/
944-
private void acquirePrimaryShardReference(ShardId shardId, String allocationId, long primaryTerm,
945-
ActionListener<PrimaryShardReference> onReferenceAcquired, Object debugInfo) {
946-
IndexShard indexShard = getIndexShard(shardId);
947-
// we may end up here if the cluster state used to route the primary is so stale that the underlying
948-
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
949-
// the replica will take over and a replica will be assigned to the first node.
950-
if (indexShard.routingEntry().primary() == false) {
951-
throw new ReplicationOperation.RetryOnPrimaryException(indexShard.shardId(),
952-
"actual shard is not a primary " + indexShard.routingEntry());
953-
}
954-
final String actualAllocationId = indexShard.routingEntry().allocationId().getId();
955-
if (actualAllocationId.equals(allocationId) == false) {
956-
throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
957-
}
958-
final long actualTerm = indexShard.getPendingPrimaryTerm();
959-
if (actualTerm != primaryTerm) {
960-
throw new ShardNotFoundException(shardId, "expected aID [{}] with term [{}] but found [{}]", allocationId,
961-
primaryTerm, actualTerm);
962-
}
963-
964-
ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {
965-
@Override
966-
public void onResponse(Releasable releasable) {
967-
onReferenceAcquired.onResponse(new PrimaryShardReference(indexShard, releasable));
968-
}
969-
970-
@Override
971-
public void onFailure(Exception e) {
972-
onReferenceAcquired.onFailure(e);
973-
}
974-
};
966+
protected void acquirePrimaryOperationPermit(final IndexShard primary,
967+
final Request request,
968+
final ActionListener<Releasable> onAcquired) {
969+
primary.acquirePrimaryOperationPermit(onAcquired, executor, request);
970+
}
975971

976-
indexShard.acquirePrimaryOperationPermit(onAcquired, executor, debugInfo);
972+
/**
973+
* Executes the logic for acquiring one or more operation permit on a replica shard. The default is to acquire a single permit but this
974+
* method can be overridden to acquire more.
975+
*/
976+
protected void acquireReplicaOperationPermit(final IndexShard replica,
977+
final ReplicaRequest request,
978+
final ActionListener<Releasable> onAcquired,
979+
final long primaryTerm,
980+
final long globalCheckpoint,
981+
final long maxSeqNoOfUpdatesOrDeletes) {
982+
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor, request);
977983
}
978984

979985
class ShardReference implements Releasable {

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 84 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2302,7 +2302,18 @@ public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcq
23022302
indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo);
23032303
}
23042304

2305-
private <E extends Exception> void bumpPrimaryTerm(long newPrimaryTerm, final CheckedRunnable<E> onBlocked) {
2305+
/**
2306+
* Acquire all primary operation permits. Once all permits are acquired, the provided ActionListener is called.
2307+
* It is the responsibility of the caller to close the {@link Releasable}.
2308+
*/
2309+
public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
2310+
verifyNotClosed();
2311+
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;
2312+
2313+
indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
2314+
}
2315+
2316+
private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable<E> onBlocked) {
23062317
assert Thread.holdsLock(mutex);
23072318
assert newPrimaryTerm > pendingPrimaryTerm;
23082319
assert operationPrimaryTerm <= pendingPrimaryTerm;
@@ -2357,11 +2368,42 @@ public void onResponse(final Releasable releasable) {
23572368
public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes,
23582369
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
23592370
final Object debugInfo) {
2371+
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
2372+
(listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo));
2373+
}
2374+
2375+
/**
2376+
* Acquire all replica operation permits whenever the shard is ready for indexing (see
2377+
* {@link #acquireAllPrimaryOperationsPermits(ActionListener, TimeValue)}. If the given primary term is lower than then one in
2378+
* {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an
2379+
* {@link IllegalStateException}.
2380+
*
2381+
* @param opPrimaryTerm the operation primary term
2382+
* @param globalCheckpoint the global checkpoint associated with the request
2383+
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary
2384+
* after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()}
2385+
* @param onPermitAcquired the listener for permit acquisition
2386+
* @param timeout the maximum time to wait for the in-flight operations block
2387+
*/
2388+
public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm,
2389+
final long globalCheckpoint,
2390+
final long maxSeqNoOfUpdatesOrDeletes,
2391+
final ActionListener<Releasable> onPermitAcquired,
2392+
final TimeValue timeout) {
2393+
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
2394+
(listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit()));
2395+
}
2396+
2397+
private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm,
2398+
final long globalCheckpoint,
2399+
final long maxSeqNoOfUpdatesOrDeletes,
2400+
final ActionListener<Releasable> onPermitAcquired,
2401+
final Consumer<ActionListener<Releasable>> consumer) {
23602402
verifyNotClosed();
23612403
if (opPrimaryTerm > pendingPrimaryTerm) {
23622404
synchronized (mutex) {
23632405
if (opPrimaryTerm > pendingPrimaryTerm) {
2364-
IndexShardState shardState = state();
2406+
final IndexShardState shardState = state();
23652407
// only roll translog and update primary term if shard has made it past recovery
23662408
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
23672409
// means that the master will fail this shard as all initializing shards are failed when a primary is selected
@@ -2373,58 +2415,54 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g
23732415

23742416
if (opPrimaryTerm > pendingPrimaryTerm) {
23752417
bumpPrimaryTerm(opPrimaryTerm, () -> {
2376-
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
2377-
final long currentGlobalCheckpoint = getGlobalCheckpoint();
2378-
final long maxSeqNo = seqNoStats().getMaxSeqNo();
2379-
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
2380-
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
2381-
if (currentGlobalCheckpoint < maxSeqNo) {
2382-
resetEngineToGlobalCheckpoint();
2383-
} else {
2384-
getEngine().rollTranslogGeneration();
2385-
}
2418+
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
2419+
final long currentGlobalCheckpoint = getGlobalCheckpoint();
2420+
final long maxSeqNo = seqNoStats().getMaxSeqNo();
2421+
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
2422+
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
2423+
if (currentGlobalCheckpoint < maxSeqNo) {
2424+
resetEngineToGlobalCheckpoint();
2425+
} else {
2426+
getEngine().rollTranslogGeneration();
2427+
}
23862428
});
23872429
}
23882430
}
23892431
}
23902432
}
2391-
23922433
assert opPrimaryTerm <= pendingPrimaryTerm
2393-
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
2394-
indexShardOperationPermits.acquire(
2395-
new ActionListener<Releasable>() {
2396-
@Override
2397-
public void onResponse(final Releasable releasable) {
2398-
if (opPrimaryTerm < operationPrimaryTerm) {
2399-
releasable.close();
2400-
final String message = String.format(
2401-
Locale.ROOT,
2402-
"%s operation primary term [%d] is too old (current [%d])",
2403-
shardId,
2404-
opPrimaryTerm,
2405-
operationPrimaryTerm);
2406-
onPermitAcquired.onFailure(new IllegalStateException(message));
2407-
} else {
2408-
assert assertReplicationTarget();
2409-
try {
2410-
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
2411-
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
2412-
} catch (Exception e) {
2413-
releasable.close();
2414-
onPermitAcquired.onFailure(e);
2415-
return;
2416-
}
2417-
onPermitAcquired.onResponse(releasable);
2418-
}
2419-
}
2420-
2421-
@Override
2422-
public void onFailure(final Exception e) {
2434+
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
2435+
consumer.accept(new ActionListener<Releasable>() {
2436+
@Override
2437+
public void onResponse(final Releasable releasable) {
2438+
if (opPrimaryTerm < operationPrimaryTerm) {
2439+
releasable.close();
2440+
final String message = String.format(
2441+
Locale.ROOT,
2442+
"%s operation primary term [%d] is too old (current [%d])",
2443+
shardId,
2444+
opPrimaryTerm,
2445+
operationPrimaryTerm);
2446+
onPermitAcquired.onFailure(new IllegalStateException(message));
2447+
} else {
2448+
assert assertReplicationTarget();
2449+
try {
2450+
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
2451+
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
2452+
} catch (Exception e) {
2453+
releasable.close();
24232454
onPermitAcquired.onFailure(e);
2455+
return;
24242456
}
2425-
},
2426-
executorOnDelay,
2427-
true, debugInfo);
2457+
onPermitAcquired.onResponse(releasable);
2458+
}
2459+
}
2460+
2461+
@Override
2462+
public void onFailure(final Exception e) {
2463+
onPermitAcquired.onFailure(e);
2464+
}
2465+
});
24282466
}
24292467

24302468
public int getActiveOperationsCount() {

server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -949,11 +949,11 @@ action.new PrimaryOperationTransportHandler().messageReceived(
949949
logger.debug("got exception:" , throwable);
950950
assertTrue(throwable.getClass() + " is not a retry exception", action.retryPrimaryException(throwable));
951951
if (wrongAllocationId) {
952-
assertThat(throwable.getMessage(), containsString("expected aID [_not_a_valid_aid_] but found [" +
952+
assertThat(throwable.getMessage(), containsString("expected allocation id [_not_a_valid_aid_] but found [" +
953953
primary.allocationId().getId() + "]"));
954954
} else {
955-
assertThat(throwable.getMessage(), containsString("expected aID [" + primary.allocationId().getId() + "] with term [" +
956-
requestTerm + "] but found [" + primaryTerm + "]"));
955+
assertThat(throwable.getMessage(), containsString("expected allocation id [" + primary.allocationId().getId()
956+
+ "] with term [" + requestTerm + "] but found [" + primaryTerm + "]"));
957957
}
958958
}
959959
}

0 commit comments

Comments
 (0)