Skip to content

Commit ef255a2

Browse files
committed
Apply feedback
1 parent 493cd16 commit ef255a2

File tree

5 files changed

+183
-172
lines changed

5 files changed

+183
-172
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ class AsyncPrimaryAction extends AbstractRunnable {
335335
@Override
336336
protected void doRun() throws Exception {
337337
final ShardId shardId = request.shardId();
338-
final IndexShard indexShard = getIndexShard(shardId, targetAllocationID);
338+
final IndexShard indexShard = getIndexShard(shardId);
339339
final ShardRouting shardRouting = indexShard.routingEntry();
340340
// we may end up here if the cluster state used to route the primary is so stale that the underlying
341341
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
@@ -609,7 +609,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
609609
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
610610
final ShardId shardId = request.shardId();
611611
assert shardId != null : "request shardId must be set";
612-
this.replica = getIndexShard(shardId, targetAllocationID);
612+
this.replica = getIndexShard(shardId);
613613
}
614614

615615
@Override
@@ -719,7 +719,7 @@ public void onFailure(Exception e) {
719719
}
720720
}
721721

722-
protected IndexShard getIndexShard(final ShardId shardId, final String targetAllocationID) {
722+
protected IndexShard getIndexShard(final ShardId shardId) {
723723
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
724724
return indexService.getShard(shardId.id());
725725
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 59 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -2348,7 +2348,58 @@ public void onResponse(final Releasable releasable) {
23482348
termUpdated.countDown();
23492349
}
23502350

2351-
private void updatePrimaryTermIfNeeded(final long opPrimaryTerm, final long globalCheckpoint) {
2351+
/**
2352+
* Acquire a replica operation permit whenever the shard is ready for indexing (see
2353+
* {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)}). If the given primary term is lower than then one in
2354+
* {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an
2355+
* {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified
2356+
* name.
2357+
*
2358+
* @param opPrimaryTerm the operation primary term
2359+
* @param globalCheckpoint the global checkpoint associated with the request
2360+
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary
2361+
* after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()}
2362+
* @param onPermitAcquired the listener for permit acquisition
2363+
* @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed
2364+
* @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are
2365+
* enabled the tracing will capture the supplied object's {@link Object#toString()} value.
2366+
* Otherwise the object isn't used
2367+
*/
2368+
public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes,
2369+
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
2370+
final Object debugInfo) {
2371+
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
2372+
(listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo));
2373+
}
2374+
2375+
/**
2376+
* Acquire all replica operation permits whenever the shard is ready for indexing (see
2377+
* {@link #acquireAllPrimaryOperationsPermits(ActionListener, TimeValue)}. If the given primary term is lower than then one in
2378+
* {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an
2379+
* {@link IllegalStateException}.
2380+
*
2381+
* @param opPrimaryTerm the operation primary term
2382+
* @param globalCheckpoint the global checkpoint associated with the request
2383+
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary
2384+
* after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()}
2385+
* @param onPermitAcquired the listener for permit acquisition
2386+
* @param timeout the maximum time to wait for the in-flight operations block
2387+
*/
2388+
public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm,
2389+
final long globalCheckpoint,
2390+
final long maxSeqNoOfUpdatesOrDeletes,
2391+
final ActionListener<Releasable> onPermitAcquired,
2392+
final TimeValue timeout) {
2393+
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
2394+
(listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit()));
2395+
}
2396+
2397+
private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm,
2398+
final long globalCheckpoint,
2399+
final long maxSeqNoOfUpdatesOrDeletes,
2400+
final ActionListener<Releasable> onPermitAcquired,
2401+
final Consumer<ActionListener<Releasable>> consumer) {
2402+
verifyNotClosed();
23522403
if (opPrimaryTerm > pendingPrimaryTerm) {
23532404
synchronized (mutex) {
23542405
if (opPrimaryTerm > pendingPrimaryTerm) {
@@ -2381,19 +2432,7 @@ private void updatePrimaryTermIfNeeded(final long opPrimaryTerm, final long glob
23812432
}
23822433
assert opPrimaryTerm <= pendingPrimaryTerm
23832434
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
2384-
}
2385-
2386-
/**
2387-
* Creates a new action listener which verifies that the operation primary term is not too old. If the given primary
2388-
* term is lower than the current one, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with
2389-
* an {@link IllegalStateException}. Otherwise the global checkpoint and the max_seq_no_of_updates marker of the replica are updated
2390-
* before the invocation of the {@link ActionListener#onResponse(Object)}} method of the provided listener.
2391-
*/
2392-
private ActionListener<Releasable> createListener(final ActionListener<Releasable> listener,
2393-
final long opPrimaryTerm,
2394-
final long globalCheckpoint,
2395-
final long maxSeqNoOfUpdatesOrDeletes) {
2396-
return new ActionListener<Releasable>() {
2435+
consumer.accept(new ActionListener<Releasable>() {
23972436
@Override
23982437
public void onResponse(final Releasable releasable) {
23992438
if (opPrimaryTerm < operationPrimaryTerm) {
@@ -2404,81 +2443,26 @@ public void onResponse(final Releasable releasable) {
24042443
shardId,
24052444
opPrimaryTerm,
24062445
operationPrimaryTerm);
2407-
listener.onFailure(new IllegalStateException(message));
2446+
onPermitAcquired.onFailure(new IllegalStateException(message));
24082447
} else {
24092448
assert assertReplicationTarget();
24102449
try {
24112450
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
24122451
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
2413-
} catch (final Exception e) {
2452+
} catch (Exception e) {
24142453
releasable.close();
2415-
listener.onFailure(e);
2454+
onPermitAcquired.onFailure(e);
24162455
return;
24172456
}
2418-
listener.onResponse(releasable);
2457+
onPermitAcquired.onResponse(releasable);
24192458
}
24202459
}
24212460

24222461
@Override
24232462
public void onFailure(final Exception e) {
2424-
listener.onFailure(e);
2463+
onPermitAcquired.onFailure(e);
24252464
}
2426-
};
2427-
}
2428-
2429-
/**
2430-
* Acquire a replica operation permit whenever the shard is ready for indexing (see
2431-
* {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)}). If the given primary term is lower than then one in
2432-
* {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an
2433-
* {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified
2434-
* name.
2435-
*
2436-
* @param opPrimaryTerm the operation primary term
2437-
* @param globalCheckpoint the global checkpoint associated with the request
2438-
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary
2439-
* after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()}
2440-
* @param onPermitAcquired the listener for permit acquisition
2441-
* @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed
2442-
* @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are
2443-
* enabled the tracing will capture the supplied object's {@link Object#toString()} value.
2444-
* Otherwise the object isn't used
2445-
*/
2446-
public void acquireReplicaOperationPermit(final long opPrimaryTerm,
2447-
final long globalCheckpoint,
2448-
final long maxSeqNoOfUpdatesOrDeletes,
2449-
final ActionListener<Releasable> onPermitAcquired,
2450-
final String executorOnDelay,
2451-
final Object debugInfo) {
2452-
verifyNotClosed();
2453-
updatePrimaryTermIfNeeded(opPrimaryTerm, globalCheckpoint);
2454-
2455-
ActionListener<Releasable> listener = createListener(onPermitAcquired, opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
2456-
indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo);
2457-
}
2458-
2459-
/**
2460-
* Acquire all replica operation permits whenever the shard is ready for indexing (see
2461-
* {@link #acquireAllPrimaryOperationsPermits(ActionListener, TimeValue)}. If the given primary term is lower than then one in
2462-
* {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an
2463-
* {@link IllegalStateException}.
2464-
*
2465-
* @param opPrimaryTerm the operation primary term
2466-
* @param globalCheckpoint the global checkpoint associated with the request
2467-
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary
2468-
* after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()}
2469-
* @param onPermitAcquired the listener for permit acquisition
2470-
* @param timeout the maximum time to wait for the in-flight operations block
2471-
*/
2472-
public void acquireReplicaAllOperationsPermits(final long opPrimaryTerm,
2473-
final long globalCheckpoint,
2474-
final long maxSeqNoOfUpdatesOrDeletes,
2475-
final ActionListener<Releasable> onPermitAcquired,
2476-
final TimeValue timeout) {
2477-
verifyNotClosed();
2478-
updatePrimaryTermIfNeeded(opPrimaryTerm, globalCheckpoint);
2479-
2480-
ActionListener<Releasable> listener = createListener(onPermitAcquired, opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
2481-
indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit());
2465+
});
24822466
}
24832467

24842468
public int getActiveOperationsCount() {

server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -784,7 +784,7 @@ public void testSeqNoIsSetOnPrimary() throws Exception {
784784
new TestAction(Settings.EMPTY, "internal:testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction,
785785
threadPool) {
786786
@Override
787-
protected IndexShard getIndexShard(ShardId shardId, String targetAllocationId) {
787+
protected IndexShard getIndexShard(ShardId shardId) {
788788
return shard;
789789
}
790790
};

0 commit comments

Comments
 (0)