Skip to content

Commit 14a9881

Browse files
committed
ReplicationOperation should fail gracefully
Problem: finishAsFailed could be called asynchronously in the middle of operations like runPostReplicationActions which try to sync the translog. finishAsFailed immediately triggers the failure of the resultListener which releases the index shard primary operation permit. This means that runPostReplicationActions may try to sync the translog without an operation permit. Solution: We refactor the infrastructure of ReplicationOperation regarding pendingActions and the resultListener, by replacing them with a RefCountingListener. This way, if there are async failures, they are aggregated, and the result listener is called once, after all mid-way operations are done. For the specific error we got in issue elastic#97183, this means that a call to onNoLongerPrimary (which can happen if we fail to fail a replica shard or mark it as stale) will not immediately release the primary operation permit and the assertion in the translog sync will be honored. Fixes elastic#97183
1 parent e6e147e commit 14a9881

File tree

2 files changed

+69
-67
lines changed

2 files changed

+69
-67
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 68 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.action.ActionListener;
1616
import org.elasticsearch.action.UnavailableShardsException;
1717
import org.elasticsearch.action.support.ActiveShardCount;
18+
import org.elasticsearch.action.support.RefCountingListener;
1819
import org.elasticsearch.action.support.RetryableAction;
1920
import org.elasticsearch.action.support.TransportActions;
2021
import org.elasticsearch.cluster.action.shard.ShardStateAction;
@@ -56,16 +57,16 @@ public class ReplicationOperation<
5657
private final String opType;
5758
private final AtomicInteger totalShards = new AtomicInteger();
5859
/**
59-
* The number of pending sub-operations in this operation. This is incremented when the following operations start and decremented when
60-
* they complete:
60+
* A {@link RefCountingListener} that encapsulates the pending sub-operations in this operation. The references are incremented when
61+
* the following operations start and decremented when they complete:
6162
* <ul>
6263
* <li>The operation on the primary</li>
6364
* <li>The operation on each replica</li>
6465
* <li>Coordination of the operation as a whole. This prevents the operation from terminating early if we haven't started any replica
6566
* operations and the primary finishes.</li>
6667
* </ul>
6768
*/
68-
private final AtomicInteger pendingActions = new AtomicInteger();
69+
private final RefCountingListener pendingActionsListener;
6970
private final AtomicInteger successfulShards = new AtomicInteger();
7071
private final Primary<Request, ReplicaRequest, PrimaryResultT> primary;
7172
private final Replicas<ReplicaRequest> replicasProxy;
@@ -74,9 +75,6 @@ public class ReplicationOperation<
7475
private final TimeValue retryTimeout;
7576
private final long primaryTerm;
7677

77-
// exposed for tests
78-
private final ActionListener<PrimaryResultT> resultListener;
79-
8078
private volatile PrimaryResultT primaryResult = null;
8179

8280
private final List<ReplicationResponse.ShardInfo.Failure> shardReplicaFailures = Collections.synchronizedList(new ArrayList<>());
@@ -95,7 +93,16 @@ public ReplicationOperation(
9593
) {
9694
this.replicasProxy = replicas;
9795
this.primary = primary;
98-
this.resultListener = listener;
96+
this.pendingActionsListener = new RefCountingListener(ActionListener.wrap((ignored) -> {
97+
primaryResult.setShardInfo(
98+
ReplicationResponse.ShardInfo.of(
99+
totalShards.get(),
100+
successfulShards.get(),
101+
shardReplicaFailures.toArray(ReplicationResponse.NO_FAILURES)
102+
)
103+
);
104+
listener.onResponse(primaryResult);
105+
}, listener::onFailure));
99106
this.logger = logger;
100107
this.threadPool = threadPool;
101108
this.request = request;
@@ -106,28 +113,37 @@ public ReplicationOperation(
106113
}
107114

108115
public void execute() throws Exception {
109-
final String activeShardCountFailure = checkActiveShardCount();
110-
final ShardRouting primaryRouting = primary.routingEntry();
111-
final ShardId primaryId = primaryRouting.shardId();
112-
if (activeShardCountFailure != null) {
113-
finishAsFailed(
114-
new UnavailableShardsException(
115-
primaryId,
116-
"{} Timeout: [{}], request: [{}]",
117-
activeShardCountFailure,
118-
request.timeout(),
119-
request
116+
try (pendingActionsListener) {
117+
final String activeShardCountFailure = checkActiveShardCount();
118+
final ShardRouting primaryRouting = primary.routingEntry();
119+
final ShardId primaryId = primaryRouting.shardId();
120+
if (activeShardCountFailure != null) {
121+
pendingActionsListener.acquire()
122+
.onFailure(
123+
new UnavailableShardsException(
124+
primaryId,
125+
"{} Timeout: [{}], request: [{}]",
126+
activeShardCountFailure,
127+
request.timeout(),
128+
request
129+
)
130+
);
131+
return;
132+
}
133+
134+
totalShards.incrementAndGet();
135+
var primaryCoordinationListener = pendingActionsListener.acquire(); // triggered when we finish all primary coordination
136+
primary.perform(
137+
request,
138+
ActionListener.wrap(
139+
primaryResult -> { handlePrimaryResult(primaryResult, primaryCoordinationListener); },
140+
primaryCoordinationListener::onFailure
120141
)
121142
);
122-
return;
123143
}
124-
125-
totalShards.incrementAndGet();
126-
pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
127-
primary.perform(request, ActionListener.wrap(this::handlePrimaryResult, this::finishAsFailed));
128144
}
129145

130-
private void handlePrimaryResult(final PrimaryResultT primaryResult) {
146+
private void handlePrimaryResult(final PrimaryResultT primaryResult, ActionListener<Void> primaryCoordinationListener) {
131147
this.primaryResult = primaryResult;
132148
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
133149
if (replicaRequest != null) {
@@ -136,11 +152,11 @@ private void handlePrimaryResult(final PrimaryResultT primaryResult) {
136152
}
137153
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
138154

139-
pendingActions.incrementAndGet();
155+
var primaryOperationCompleteListener = pendingActionsListener.acquire();
140156
replicasProxy.onPrimaryOperationComplete(
141157
replicaRequest,
142158
replicationGroup.getRoutingTable(),
143-
ActionListener.wrap(ignored -> decPendingAndFinishIfNeeded(), exception -> {
159+
ActionListener.wrap(ignored -> primaryOperationCompleteListener.onResponse(null), exception -> {
144160
totalShards.incrementAndGet();
145161
shardReplicaFailures.add(
146162
new ReplicationResponse.ShardInfo.Failure(
@@ -151,7 +167,7 @@ private void handlePrimaryResult(final PrimaryResultT primaryResult) {
151167
false
152168
)
153169
);
154-
decPendingAndFinishIfNeeded();
170+
primaryOperationCompleteListener.onResponse(null);
155171
})
156172
);
157173

@@ -181,7 +197,7 @@ public void onResponse(Void aVoid) {
181197
primary.routingEntry(),
182198
primary::localCheckpoint,
183199
primary::globalCheckpoint,
184-
() -> decPendingAndFinishIfNeeded()
200+
() -> primaryCoordinationListener.onResponse(null)
185201
);
186202
}
187203

@@ -193,20 +209,25 @@ public void onFailure(Exception e) {
193209
// We update the checkpoints since a refresh might fail but the operations could be safely persisted, in the case that the
194210
// fsync failed the local checkpoint won't advance and the engine will be marked as failed when the next indexing operation
195211
// is appended into the translog.
196-
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint, () -> finishAsFailed(e));
212+
updateCheckPoints(
213+
primary.routingEntry(),
214+
primary::localCheckpoint,
215+
primary::globalCheckpoint,
216+
() -> primaryCoordinationListener.onFailure(e)
217+
);
197218
}
198219
});
199220
}
200221

201222
private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, ReplicationGroup replicationGroup) {
202223
// if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale
203224
for (String allocationId : replicationGroup.getUnavailableInSyncShards()) {
204-
pendingActions.incrementAndGet();
225+
var staleCopyListener = pendingActionsListener.acquire();
205226
replicasProxy.markShardCopyAsStaleIfNeeded(
206227
replicaRequest.shardId(),
207228
allocationId,
208229
primaryTerm,
209-
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary)
230+
ActionListener.wrap(r -> staleCopyListener.onResponse(null), e -> onNoLongerPrimary(e, staleCopyListener))
210231
);
211232
}
212233
}
@@ -243,12 +264,17 @@ private void performOnReplica(
243264
logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
244265
}
245266
totalShards.incrementAndGet();
246-
pendingActions.incrementAndGet();
267+
var replicationPendingActionListener = pendingActionsListener.acquire();
247268
final ActionListener<ReplicaResponse> replicationListener = new ActionListener<>() {
248269
@Override
249270
public void onResponse(ReplicaResponse response) {
250271
successfulShards.incrementAndGet();
251-
updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint, () -> decPendingAndFinishIfNeeded());
272+
updateCheckPoints(
273+
shard,
274+
response::localCheckpoint,
275+
response::globalCheckpoint,
276+
() -> replicationPendingActionListener.onResponse(null)
277+
);
252278
}
253279

254280
@Override
@@ -282,7 +308,10 @@ public void onFailure(Exception replicaException) {
282308
primaryTerm,
283309
message,
284310
replicaException,
285-
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary)
311+
ActionListener.wrap(
312+
r -> replicationPendingActionListener.onResponse(null),
313+
e -> onNoLongerPrimary(e, replicationPendingActionListener)
314+
)
286315
);
287316
}
288317

@@ -369,13 +398,13 @@ public void onAfter() {
369398
}
370399
}
371400

372-
private void onNoLongerPrimary(Exception failure) {
401+
private void onNoLongerPrimary(Exception failure, ActionListener<Void> listener) {
373402
final Throwable cause = ExceptionsHelper.unwrapCause(failure);
374403
final boolean nodeIsClosing = cause instanceof NodeClosedException;
375404
if (nodeIsClosing) {
376405
// We prefer not to fail the primary to avoid unnecessary warning log
377406
// when the node with the primary shard is gracefully shutting down.
378-
finishAsFailed(
407+
listener.onFailure(
379408
new RetryOnPrimaryException(
380409
primary.routingEntry().shardId(),
381410
String.format(
@@ -398,7 +427,7 @@ protected void doRun() {
398427
primary.routingEntry()
399428
);
400429
primary.failShard(message, failure);
401-
finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), message, failure));
430+
listener.onFailure(new RetryOnPrimaryException(primary.routingEntry().shardId(), message, failure));
402431
}
403432

404433
@Override
@@ -411,7 +440,7 @@ public void onFailure(Exception e) {
411440
e.addSuppressed(failure);
412441
assert false : e;
413442
logger.error(() -> "unexpected failure while failing primary [" + primary.routingEntry() + "]", e);
414-
finishAsFailed(
443+
listener.onFailure(
415444
new RetryOnPrimaryException(
416445
primary.routingEntry().shardId(),
417446
String.format(Locale.ROOT, "unexpected failure while failing primary [%s]", primary.routingEntry()),
@@ -461,32 +490,6 @@ protected String checkActiveShardCount() {
461490
}
462491
}
463492

464-
private void decPendingAndFinishIfNeeded() {
465-
assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]";
466-
if (pendingActions.decrementAndGet() == 0) {
467-
finish();
468-
}
469-
}
470-
471-
private void finish() {
472-
if (finished.compareAndSet(false, true)) {
473-
primaryResult.setShardInfo(
474-
ReplicationResponse.ShardInfo.of(
475-
totalShards.get(),
476-
successfulShards.get(),
477-
shardReplicaFailures.toArray(ReplicationResponse.NO_FAILURES)
478-
)
479-
);
480-
resultListener.onResponse(primaryResult);
481-
}
482-
}
483-
484-
private void finishAsFailed(Exception exception) {
485-
if (finished.compareAndSet(false, true)) {
486-
resultListener.onFailure(exception);
487-
}
488-
}
489-
490493
/**
491494
* An encapsulation of an operation that is to be performed on the primary shard
492495
*/
@@ -693,7 +696,7 @@ public interface PrimaryResult<RequestT extends ReplicationRequest<RequestT>> {
693696

694697
/**
695698
* Run actions to be triggered post replication
696-
* @param listener calllback that is invoked after post replication actions have completed
699+
* @param listener callback that is invoked after post replication actions have completed
697700
* */
698701
void runPostReplicationActions(ActionListener<Void> listener);
699702
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3860,8 +3860,7 @@ public int getActiveOperationsCount() {
38603860
* listener handles all exception cases internally.
38613861
*/
38623862
public final void syncAfterWrite(Translog.Location location, Consumer<Exception> syncListener) {
3863-
// TODO AwaitsFix https://github.com/elastic/elasticsearch/issues/97183
3864-
// assert indexShardOperationPermits.getActiveOperationsCount() != 0;
3863+
assert indexShardOperationPermits.getActiveOperationsCount() != 0;
38653864
verifyNotClosed();
38663865
getEngine().asyncEnsureTranslogSynced(location, syncListener);
38673866
}

0 commit comments

Comments
 (0)