Skip to content

Commit c7c672f

Browse files
committed
ReplicationOperation should fail gracefully
Problem: finishAsFailed could be called asynchronously in the middle of operations like runPostReplicationActions which try to sync the translog. finishAsFailed immediately triggers the failure of the resultListener which releases the index shard primary operation permit. This means that runPostReplicationActions may try to sync the translog without an operation permit. Solution: We refactor the infrastructure of ReplicationOperation regarding pendingActions and the resultListener, by replacing them with a RefCountingListener. This way, if there are async failures, they are aggregated, and the result listener is called once, after all mid-way operations are done. For the specific error we got in issue elastic#97183, this means that a call to onNoLongerPrimary (which can happen if we fail to fail a replica shard or mark it as stale) will not immediately release the primary operation permit and the assertion in the translog sync will be honored. Fixes elastic#97183
1 parent e6e147e commit c7c672f

File tree

2 files changed

+70
-69
lines changed

2 files changed

+70
-69
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 69 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.action.ActionListener;
1616
import org.elasticsearch.action.UnavailableShardsException;
1717
import org.elasticsearch.action.support.ActiveShardCount;
18+
import org.elasticsearch.action.support.RefCountingListener;
1819
import org.elasticsearch.action.support.RetryableAction;
1920
import org.elasticsearch.action.support.TransportActions;
2021
import org.elasticsearch.cluster.action.shard.ShardStateAction;
@@ -56,16 +57,16 @@ public class ReplicationOperation<
5657
private final String opType;
5758
private final AtomicInteger totalShards = new AtomicInteger();
5859
/**
59-
* The number of pending sub-operations in this operation. This is incremented when the following operations start and decremented when
60-
* they complete:
60+
* A {@link RefCountingListener} that encapsulates the pending sub-operations in this operation. A new listener is acquired when the
61+
* following operations start and triggered when they complete:
6162
* <ul>
6263
* <li>The operation on the primary</li>
6364
* <li>The operation on each replica</li>
6465
* <li>Coordination of the operation as a whole. This prevents the operation from terminating early if we haven't started any replica
6566
* operations and the primary finishes.</li>
6667
* </ul>
6768
*/
68-
private final AtomicInteger pendingActions = new AtomicInteger();
69+
private final RefCountingListener pendingActionsListener;
6970
private final AtomicInteger successfulShards = new AtomicInteger();
7071
private final Primary<Request, ReplicaRequest, PrimaryResultT> primary;
7172
private final Replicas<ReplicaRequest> replicasProxy;
@@ -74,9 +75,6 @@ public class ReplicationOperation<
7475
private final TimeValue retryTimeout;
7576
private final long primaryTerm;
7677

77-
// exposed for tests
78-
private final ActionListener<PrimaryResultT> resultListener;
79-
8078
private volatile PrimaryResultT primaryResult = null;
8179

8280
private final List<ReplicationResponse.ShardInfo.Failure> shardReplicaFailures = Collections.synchronizedList(new ArrayList<>());
@@ -95,7 +93,16 @@ public ReplicationOperation(
9593
) {
9694
this.replicasProxy = replicas;
9795
this.primary = primary;
98-
this.resultListener = listener;
96+
this.pendingActionsListener = new RefCountingListener(ActionListener.wrap((ignored) -> {
97+
primaryResult.setShardInfo(
98+
ReplicationResponse.ShardInfo.of(
99+
totalShards.get(),
100+
successfulShards.get(),
101+
shardReplicaFailures.toArray(ReplicationResponse.NO_FAILURES)
102+
)
103+
);
104+
listener.onResponse(primaryResult);
105+
}, listener::onFailure));
99106
this.logger = logger;
100107
this.threadPool = threadPool;
101108
this.request = request;
@@ -106,28 +113,33 @@ public ReplicationOperation(
106113
}
107114

108115
public void execute() throws Exception {
109-
final String activeShardCountFailure = checkActiveShardCount();
110-
final ShardRouting primaryRouting = primary.routingEntry();
111-
final ShardId primaryId = primaryRouting.shardId();
112-
if (activeShardCountFailure != null) {
113-
finishAsFailed(
114-
new UnavailableShardsException(
115-
primaryId,
116-
"{} Timeout: [{}], request: [{}]",
117-
activeShardCountFailure,
118-
request.timeout(),
119-
request
120-
)
121-
);
122-
return;
123-
}
116+
try (pendingActionsListener) {
117+
final String activeShardCountFailure = checkActiveShardCount();
118+
final ShardRouting primaryRouting = primary.routingEntry();
119+
final ShardId primaryId = primaryRouting.shardId();
120+
if (activeShardCountFailure != null) {
121+
pendingActionsListener.acquire()
122+
.onFailure(
123+
new UnavailableShardsException(
124+
primaryId,
125+
"{} Timeout: [{}], request: [{}]",
126+
activeShardCountFailure,
127+
request.timeout(),
128+
request
129+
)
130+
);
131+
return;
132+
}
124133

125-
totalShards.incrementAndGet();
126-
pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
127-
primary.perform(request, ActionListener.wrap(this::handlePrimaryResult, this::finishAsFailed));
134+
totalShards.incrementAndGet();
135+
var primaryCoordinationPendingActionListener = pendingActionsListener.acquire(); // triggered when we finish all coordination
136+
primary.perform(request, ActionListener.wrap(primaryResult -> {
137+
handlePrimaryResult(primaryResult, primaryCoordinationPendingActionListener);
138+
}, primaryCoordinationPendingActionListener::onFailure));
139+
}
128140
}
129141

130-
private void handlePrimaryResult(final PrimaryResultT primaryResult) {
142+
private void handlePrimaryResult(final PrimaryResultT primaryResult, ActionListener<Void> primaryCoordinationPendingActionListener) {
131143
this.primaryResult = primaryResult;
132144
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
133145
if (replicaRequest != null) {
@@ -136,11 +148,11 @@ private void handlePrimaryResult(final PrimaryResultT primaryResult) {
136148
}
137149
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
138150

139-
pendingActions.incrementAndGet();
151+
var primaryOperationPendingActionListener = pendingActionsListener.acquire();
140152
replicasProxy.onPrimaryOperationComplete(
141153
replicaRequest,
142154
replicationGroup.getRoutingTable(),
143-
ActionListener.wrap(ignored -> decPendingAndFinishIfNeeded(), exception -> {
155+
ActionListener.wrap(ignored -> primaryOperationPendingActionListener.onResponse(null), exception -> {
144156
totalShards.incrementAndGet();
145157
shardReplicaFailures.add(
146158
new ReplicationResponse.ShardInfo.Failure(
@@ -151,7 +163,7 @@ private void handlePrimaryResult(final PrimaryResultT primaryResult) {
151163
false
152164
)
153165
);
154-
decPendingAndFinishIfNeeded();
166+
primaryOperationPendingActionListener.onResponse(null);
155167
})
156168
);
157169

@@ -181,7 +193,7 @@ public void onResponse(Void aVoid) {
181193
primary.routingEntry(),
182194
primary::localCheckpoint,
183195
primary::globalCheckpoint,
184-
() -> decPendingAndFinishIfNeeded()
196+
() -> primaryCoordinationPendingActionListener.onResponse(null)
185197
);
186198
}
187199

@@ -193,20 +205,28 @@ public void onFailure(Exception e) {
193205
// We update the checkpoints since a refresh might fail but the operations could be safely persisted, in the case that the
194206
// fsync failed the local checkpoint won't advance and the engine will be marked as failed when the next indexing operation
195207
// is appended into the translog.
196-
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint, () -> finishAsFailed(e));
208+
updateCheckPoints(
209+
primary.routingEntry(),
210+
primary::localCheckpoint,
211+
primary::globalCheckpoint,
212+
() -> primaryCoordinationPendingActionListener.onFailure(e)
213+
);
197214
}
198215
});
199216
}
200217

201218
private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, ReplicationGroup replicationGroup) {
202219
// if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale
203220
for (String allocationId : replicationGroup.getUnavailableInSyncShards()) {
204-
pendingActions.incrementAndGet();
221+
var staleCopyPendingActionListener = pendingActionsListener.acquire();
205222
replicasProxy.markShardCopyAsStaleIfNeeded(
206223
replicaRequest.shardId(),
207224
allocationId,
208225
primaryTerm,
209-
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary)
226+
ActionListener.wrap(
227+
r -> staleCopyPendingActionListener.onResponse(null),
228+
e -> onNoLongerPrimary(e, staleCopyPendingActionListener)
229+
)
210230
);
211231
}
212232
}
@@ -243,12 +263,17 @@ private void performOnReplica(
243263
logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
244264
}
245265
totalShards.incrementAndGet();
246-
pendingActions.incrementAndGet();
266+
var replicationPendingActionListener = pendingActionsListener.acquire();
247267
final ActionListener<ReplicaResponse> replicationListener = new ActionListener<>() {
248268
@Override
249269
public void onResponse(ReplicaResponse response) {
250270
successfulShards.incrementAndGet();
251-
updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint, () -> decPendingAndFinishIfNeeded());
271+
updateCheckPoints(
272+
shard,
273+
response::localCheckpoint,
274+
response::globalCheckpoint,
275+
() -> replicationPendingActionListener.onResponse(null)
276+
);
252277
}
253278

254279
@Override
@@ -282,7 +307,10 @@ public void onFailure(Exception replicaException) {
282307
primaryTerm,
283308
message,
284309
replicaException,
285-
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary)
310+
ActionListener.wrap(
311+
r -> replicationPendingActionListener.onResponse(null),
312+
e -> onNoLongerPrimary(e, replicationPendingActionListener)
313+
)
286314
);
287315
}
288316

@@ -369,13 +397,13 @@ public void onAfter() {
369397
}
370398
}
371399

372-
private void onNoLongerPrimary(Exception failure) {
400+
private void onNoLongerPrimary(Exception failure, ActionListener<Void> pendingActionlistener) {
373401
final Throwable cause = ExceptionsHelper.unwrapCause(failure);
374402
final boolean nodeIsClosing = cause instanceof NodeClosedException;
375403
if (nodeIsClosing) {
376404
// We prefer not to fail the primary to avoid unnecessary warning log
377405
// when the node with the primary shard is gracefully shutting down.
378-
finishAsFailed(
406+
pendingActionlistener.onFailure(
379407
new RetryOnPrimaryException(
380408
primary.routingEntry().shardId(),
381409
String.format(
@@ -398,7 +426,7 @@ protected void doRun() {
398426
primary.routingEntry()
399427
);
400428
primary.failShard(message, failure);
401-
finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), message, failure));
429+
pendingActionlistener.onFailure(new RetryOnPrimaryException(primary.routingEntry().shardId(), message, failure));
402430
}
403431

404432
@Override
@@ -411,7 +439,7 @@ public void onFailure(Exception e) {
411439
e.addSuppressed(failure);
412440
assert false : e;
413441
logger.error(() -> "unexpected failure while failing primary [" + primary.routingEntry() + "]", e);
414-
finishAsFailed(
442+
pendingActionlistener.onFailure(
415443
new RetryOnPrimaryException(
416444
primary.routingEntry().shardId(),
417445
String.format(Locale.ROOT, "unexpected failure while failing primary [%s]", primary.routingEntry()),
@@ -461,32 +489,6 @@ protected String checkActiveShardCount() {
461489
}
462490
}
463491

464-
private void decPendingAndFinishIfNeeded() {
465-
assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]";
466-
if (pendingActions.decrementAndGet() == 0) {
467-
finish();
468-
}
469-
}
470-
471-
private void finish() {
472-
if (finished.compareAndSet(false, true)) {
473-
primaryResult.setShardInfo(
474-
ReplicationResponse.ShardInfo.of(
475-
totalShards.get(),
476-
successfulShards.get(),
477-
shardReplicaFailures.toArray(ReplicationResponse.NO_FAILURES)
478-
)
479-
);
480-
resultListener.onResponse(primaryResult);
481-
}
482-
}
483-
484-
private void finishAsFailed(Exception exception) {
485-
if (finished.compareAndSet(false, true)) {
486-
resultListener.onFailure(exception);
487-
}
488-
}
489-
490492
/**
491493
* An encapsulation of an operation that is to be performed on the primary shard
492494
*/
@@ -693,7 +695,7 @@ public interface PrimaryResult<RequestT extends ReplicationRequest<RequestT>> {
693695

694696
/**
695697
* Run actions to be triggered post replication
696-
* @param listener calllback that is invoked after post replication actions have completed
698+
* @param listener callback that is invoked after post replication actions have completed
697699
* */
698700
void runPostReplicationActions(ActionListener<Void> listener);
699701
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3860,8 +3860,7 @@ public int getActiveOperationsCount() {
38603860
* listener handles all exception cases internally.
38613861
*/
38623862
public final void syncAfterWrite(Translog.Location location, Consumer<Exception> syncListener) {
3863-
// TODO AwaitsFix https://github.com/elastic/elasticsearch/issues/97183
3864-
// assert indexShardOperationPermits.getActiveOperationsCount() != 0;
3863+
assert indexShardOperationPermits.getActiveOperationsCount() != 0;
38653864
verifyNotClosed();
38663865
getEngine().asyncEnsureTranslogSynced(location, syncListener);
38673866
}

0 commit comments

Comments
 (0)